diff --git a/src/apps/api/routers/agents.py b/src/apps/api/routers/agents.py
index 559bfa1..dda663e 100644
--- a/src/apps/api/routers/agents.py
+++ b/src/apps/api/routers/agents.py
@@ -3,11 +3,12 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
-from sqlalchemy import case, func, select
+from sqlalchemy import select
from sqlalchemy.orm import Session
from src.apps.api.deps import get_db
from src.packages.core.db.models import AgentRoleORM, ExecutionRunORM
+from src.packages.core.costs import estimate_cost
from src.packages.core.schemas import (
AgentCapabilityDeclaration,
AgentRoleDetailRead,
@@ -50,9 +51,10 @@ def _to_agent_detail(agent_role: AgentRoleORM) -> AgentRoleDetailRead:
def _to_agent_registry_item(
agent_role: AgentRoleORM,
*,
- total_runs: int,
- success_runs: int,
+ stats: dict[str, Any],
) -> AgentRegistryListItemRead:
+ total_runs = int(stats.get("total_runs", 0))
+ success_runs = int(stats.get("success_runs", 0))
success_rate = None
if total_runs > 0:
success_rate = round((success_runs / total_runs) * 100, 2)
@@ -70,6 +72,16 @@ def _to_agent_registry_item(
total_runs=total_runs,
success_runs=success_runs,
success_rate=success_rate,
+ average_latency_ms=stats.get("average_latency_ms"),
+ retry_rate=stats.get("retry_rate"),
+ average_prompt_tokens=stats.get("average_prompt_tokens", 0),
+ average_completion_tokens=stats.get("average_completion_tokens", 0),
+ average_total_tokens=stats.get("average_total_tokens", 0),
+ total_prompt_tokens=stats.get("total_prompt_tokens", 0),
+ total_completion_tokens=stats.get("total_completion_tokens", 0),
+ total_tokens=stats.get("total_tokens", 0),
+ average_cost_estimate=stats.get("average_cost_estimate", 0),
+ total_cost_estimate=stats.get("total_cost_estimate", 0),
)
@@ -182,31 +194,64 @@ def get_agent_registry(
) -> AgentRegistryResponse:
agent_roles = db.scalars(select(AgentRoleORM).order_by(AgentRoleORM.role_name)).all()
- run_stats_rows = db.execute(
- select(
- ExecutionRunORM.agent_role_id,
- func.count(ExecutionRunORM.id).label("total_runs"),
- func.coalesce(
- func.sum(case((ExecutionRunORM.run_status == "success", 1), else_=0)),
- 0,
- ).label("success_runs"),
+ runs = db.scalars(select(ExecutionRunORM).order_by(ExecutionRunORM.started_at.asc(), ExecutionRunORM.id.asc())).all()
+ run_stats_by_role_id: dict[str, dict[str, Any]] = {}
+ task_run_counts: dict[str, int] = {}
+ for run in runs:
+ task_run_counts[run.task_id] = task_run_counts.get(run.task_id, 0) + 1
+
+ for run in runs:
+ stats = run_stats_by_role_id.setdefault(
+ run.agent_role_id,
+ {
+ "total_runs": 0,
+ "success_runs": 0,
+ "latency_sum": 0,
+ "latency_count": 0,
+ "total_prompt_tokens": 0,
+ "total_completion_tokens": 0,
+ "total_tokens": 0,
+ "total_cost_estimate": 0.0,
+ "retry_runs": 0,
+ },
)
- .group_by(ExecutionRunORM.agent_role_id)
- ).all()
-
- run_stats_by_role_id: dict[str, dict[str, Any]] = {
- row.agent_role_id: {
- "total_runs": int(row.total_runs or 0),
- "success_runs": int(row.success_runs or 0),
- }
- for row in run_stats_rows
- }
+ stats["total_runs"] += 1
+ if run.run_status == "success":
+ stats["success_runs"] += 1
+ if run.latency_ms is not None:
+ stats["latency_sum"] += int(run.latency_ms)
+ stats["latency_count"] += 1
+ prompt_tokens = int(run.token_usage.get("prompt_tokens", 0) or 0)
+ completion_tokens = int(run.token_usage.get("completion_tokens", 0) or 0)
+ total_tokens = int(run.token_usage.get("total_tokens", 0) or 0)
+ if total_tokens == 0:
+ total_tokens = prompt_tokens + completion_tokens
+ stats["total_prompt_tokens"] += prompt_tokens
+ stats["total_completion_tokens"] += completion_tokens
+ stats["total_tokens"] += total_tokens
+ stats["total_cost_estimate"] += estimate_cost(run.token_usage)
+ if task_run_counts.get(run.task_id, 0) > 1:
+ stats["retry_runs"] += 1
+
+ for stats in run_stats_by_role_id.values():
+ total_runs = stats["total_runs"]
+ latency_count = stats["latency_count"]
+ stats["average_latency_ms"] = (
+ round(stats["latency_sum"] / latency_count, 2) if latency_count else None
+ )
+ stats["retry_rate"] = round((stats["retry_runs"] / total_runs) * 100, 2) if total_runs else None
+ stats["average_prompt_tokens"] = round(stats["total_prompt_tokens"] / total_runs, 2) if total_runs else 0
+ stats["average_completion_tokens"] = (
+ round(stats["total_completion_tokens"] / total_runs, 2) if total_runs else 0
+ )
+ stats["average_total_tokens"] = round(stats["total_tokens"] / total_runs, 2) if total_runs else 0
+ stats["total_cost_estimate"] = round(stats["total_cost_estimate"], 6)
+ stats["average_cost_estimate"] = round(stats["total_cost_estimate"] / total_runs, 6) if total_runs else 0
items = [
_to_agent_registry_item(
agent_role,
- total_runs=run_stats_by_role_id.get(agent_role.id, {}).get("total_runs", 0),
- success_runs=run_stats_by_role_id.get(agent_role.id, {}).get("success_runs", 0),
+ stats=run_stats_by_role_id.get(agent_role.id, {}),
)
for agent_role in agent_roles
]
diff --git a/src/apps/api/routers/runs.py b/src/apps/api/routers/runs.py
index 98902e0..ecb77b6 100644
--- a/src/apps/api/routers/runs.py
+++ b/src/apps/api/routers/runs.py
@@ -5,7 +5,9 @@
from sqlalchemy.orm import Session
from src.apps.api.deps import get_db
+from src.packages.core.costs import estimate_cost
from src.packages.core.db.models import AgentRoleORM, AssignmentORM, EventLogORM, ExecutionRunORM, TaskORM
+from src.packages.core.error_classification import classify_run_error
from src.packages.core.schemas import (
ExecutionRunRead,
RunDetailRead,
@@ -89,6 +91,13 @@ def get_run_detail(run_id: str, db: Session = Depends(get_db)) -> RunDetailRead:
for item in runs
],
events=[TaskEventRead.model_validate(event) for event in events],
+ cost_estimate=estimate_cost(run.token_usage),
+ error_category=classify_run_error(
+ run_status=run.run_status,
+ error_message=run.error_message,
+ logs=run.logs,
+ routing_reason=assignment.routing_reason if assignment is not None else None,
+ ),
)
diff --git a/src/apps/api/routers/task_batches.py b/src/apps/api/routers/task_batches.py
index c1a5e87..63e1ccd 100644
--- a/src/apps/api/routers/task_batches.py
+++ b/src/apps/api/routers/task_batches.py
@@ -19,10 +19,12 @@
TaskBatchORM,
TaskORM,
)
+from src.packages.core.error_classification import classify_task_error, summarize_failure_categories
from src.packages.core.schemas import (
BatchTimelineRead,
BatchArtifactRead,
BatchCountsRead,
+ FailureCategorySummaryRead,
BatchProgressRead,
BatchTaskResultRead,
TaskBatchListItemRead,
@@ -142,6 +144,36 @@ def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[Artifa
return artifacts, artifact_counts
+def _load_active_assignments(task_ids: list[str], db: Session) -> dict[str, AssignmentORM]:
+ if not task_ids:
+ return {}
+
+ assignments = db.scalars(
+ select(AssignmentORM)
+ .where(AssignmentORM.task_id.in_(task_ids))
+ .order_by(AssignmentORM.assigned_at.desc(), AssignmentORM.id.desc())
+ ).all()
+ latest_assignments: dict[str, AssignmentORM] = {}
+ for assignment in assignments:
+ latest_assignments.setdefault(assignment.task_id, assignment)
+ return latest_assignments
+
+
+def _load_latest_reviews(task_ids: list[str], db: Session) -> dict[str, ReviewCheckpointORM]:
+ if not task_ids:
+ return {}
+
+ reviews = db.scalars(
+ select(ReviewCheckpointORM)
+ .where(ReviewCheckpointORM.task_id.in_(task_ids))
+ .order_by(ReviewCheckpointORM.created_at.desc(), ReviewCheckpointORM.id.desc())
+ ).all()
+ latest_reviews: dict[str, ReviewCheckpointORM] = {}
+ for review in reviews:
+ latest_reviews.setdefault(review.task_id, review)
+ return latest_reviews
+
+
def _batch_updated_at(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> datetime:
if not tasks:
return task_batch.created_at
@@ -454,6 +486,8 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
task_ids = [task.id for task in tasks]
latest_runs = _load_latest_runs(task_ids, db)
artifacts, artifact_counts = _load_batch_artifacts(task_ids, db)
+ latest_assignments = _load_active_assignments(task_ids, db)
+ latest_reviews = _load_latest_reviews(task_ids, db)
counts = _build_batch_counts(tasks)
progress = _build_batch_progress(tasks)
derived_status = _derive_batch_status(tasks)
@@ -471,6 +505,15 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
output_snapshot=latest_runs.get(task.id).output_snapshot if latest_runs.get(task.id) is not None else {},
error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None,
cancel_reason=latest_runs.get(task.id).cancel_reason if latest_runs.get(task.id) is not None else None,
+ error_category=classify_task_error(
+ task_status=task.status,
+ dependency_ids=task.dependency_ids,
+ run_status=latest_runs.get(task.id).run_status if latest_runs.get(task.id) is not None else None,
+ error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None,
+ logs=latest_runs.get(task.id).logs if latest_runs.get(task.id) is not None else None,
+ routing_reason=latest_assignments.get(task.id).routing_reason if latest_assignments.get(task.id) is not None else None,
+ review_reason=latest_reviews.get(task.id).reason if latest_reviews.get(task.id) is not None else None,
+ ),
artifact_count=artifact_counts.get(task.id, 0),
)
for task in tasks
@@ -496,4 +539,23 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
progress=progress,
tasks=task_results,
artifacts=artifact_reads,
+ failure_categories=[
+ FailureCategorySummaryRead.model_validate(item)
+ for item in summarize_failure_categories(
+ [
+ {
+ "task_id": task.task_id,
+ "error_category": task.error_category,
+ "error_message": task.error_message,
+ "cancel_reason": task.cancel_reason,
+ "routing_reason": (
+ latest_assignments.get(task.task_id).routing_reason
+ if latest_assignments.get(task.task_id) is not None
+ else None
+ ),
+ }
+ for task in task_results
+ ]
+ )
+ ],
)
diff --git a/src/apps/web/agents.js b/src/apps/web/agents.js
index e91144f..8fd30c7 100644
--- a/src/apps/web/agents.js
+++ b/src/apps/web/agents.js
@@ -22,6 +22,10 @@ function formatPercent(value) {
return `${value}%`;
}
+function formatCurrency(value) {
+ return `$${Number(value ?? 0).toFixed(6)}`;
+}
+
function formatJson(value) {
const entries = value && typeof value === "object" ? Object.keys(value) : [];
if (!entries.length) {
@@ -127,6 +131,18 @@ function renderAgents(items) {
Success rate${escapeHtml(formatPercent(item.success_rate))}
+
+ - Avg latency
- ${escapeHtml(item.average_latency_ms ?? "n/a")} ms
+ - Retry rate
- ${escapeHtml(formatPercent(item.retry_rate))}
+ - Total tokens
- ${escapeHtml(item.total_tokens)}
+
+
+
+ - Avg total tokens
- ${escapeHtml(item.average_total_tokens)}
+ - Avg cost estimate
- ${escapeHtml(formatCurrency(item.average_cost_estimate))}
+ - Total cost estimate
- ${escapeHtml(formatCurrency(item.total_cost_estimate))}
+
+
Input schema
${formatJson(item.input_schema)}
diff --git a/src/apps/web/batch-detail.js b/src/apps/web/batch-detail.js
index be34bd5..85f7119 100644
--- a/src/apps/web/batch-detail.js
+++ b/src/apps/web/batch-detail.js
@@ -55,7 +55,8 @@ function renderOverview(summary) {
.join("");
}
-function renderRiskGroups(tasks) {
+function renderRiskGroups(summary) {
+ const tasks = summary.tasks;
const groups = [
{
key: "failed",
@@ -77,7 +78,14 @@ function renderRiskGroups(tasks) {
},
];
- const activeGroups = groups.filter((group) => group.items.length > 0);
+ const categoryGroups = (summary.failure_categories ?? []).map((item) => ({
+ key: item.category,
+ label: `Error category ยท ${item.category}`,
+ items: tasks.filter((task) => task.error_category === item.category),
+ description: `${item.count} task${item.count === 1 ? "" : "s"} currently grouped here.`,
+ }));
+
+ const activeGroups = [...groups.filter((group) => group.items.length > 0), ...categoryGroups];
if (!activeGroups.length) {
riskGroups.innerHTML = `No failed, blocked, or review-pending tasks in this batch.
`;
return;
@@ -201,6 +209,9 @@ function taskFlags(task) {
if (task.status === "failed" && task.error_message) {
flags.push("Latest run returned an error");
}
+ if (task.error_category) {
+ flags.push(`Error category: ${task.error_category}`);
+ }
return flags;
}
@@ -235,6 +246,11 @@ function renderTasks(tasks) {
agent ${escapeHtml(task.assigned_agent_role ?? "unassigned")}
latest run ${escapeHtml(task.latest_run_status ?? "not started")}
${escapeHtml(task.artifact_count)} artifacts
+ ${
+ task.error_category
+ ? `error ${escapeHtml(task.error_category)}`
+ : ""
+ }
${
task.latest_run_id
@@ -317,7 +333,7 @@ async function loadBatchDetail() {
]);
statusText.textContent = `Batch ${summary.batch.id} is currently ${summary.derived_status}.`;
renderOverview(summary);
- renderRiskGroups(summary.tasks);
+ renderRiskGroups(summary);
renderDependencyMap(summary.tasks);
renderArtifacts(summary.artifacts);
renderTimeline(timeline.items);
diff --git a/src/apps/web/run-detail.js b/src/apps/web/run-detail.js
index 86896df..27e6e30 100644
--- a/src/apps/web/run-detail.js
+++ b/src/apps/web/run-detail.js
@@ -64,6 +64,8 @@ function renderOverview(detail) {
["Prompt tokens", detail.run.token_usage?.prompt_tokens ?? 0],
["Completion tokens", detail.run.token_usage?.completion_tokens ?? 0],
["Total tokens", detail.run.token_usage?.total_tokens ?? 0],
+ ["Cost estimate", `$${Number(detail.cost_estimate ?? 0).toFixed(6)}`],
+ ["Error category", detail.error_category ?? "n/a"],
];
overviewMetrics.innerHTML = metrics
.map(([label, value]) => `${label}${escapeHtml(value)}`)
@@ -101,6 +103,11 @@ function renderErrorAndLogs(detail) {
${detail.run.run_status === "cancelled" ? "Cancel context" : "Error message"}
${escapeHtml(detail.run.error_message ?? detail.run.cancel_reason)}
+ ${
+ detail.error_category
+ ? `Error category: ${escapeHtml(detail.error_category)}
`
+ : ""
+ }
`;
} else {
diff --git a/src/packages/core/costs.py b/src/packages/core/costs.py
new file mode 100644
index 0000000..119bd48
--- /dev/null
+++ b/src/packages/core/costs.py
@@ -0,0 +1,13 @@
+from __future__ import annotations
+
+PROMPT_COST_PER_1K = 0.001
+COMPLETION_COST_PER_1K = 0.002
+
+
+def estimate_cost(token_usage: dict | None) -> float:
+ usage = token_usage or {}
+ prompt_tokens = int(usage.get("prompt_tokens", 0) or 0)
+ completion_tokens = int(usage.get("completion_tokens", 0) or 0)
+ cost = (prompt_tokens / 1000) * PROMPT_COST_PER_1K
+ cost += (completion_tokens / 1000) * COMPLETION_COST_PER_1K
+ return round(cost, 6)
diff --git a/src/packages/core/error_classification.py b/src/packages/core/error_classification.py
new file mode 100644
index 0000000..0721079
--- /dev/null
+++ b/src/packages/core/error_classification.py
@@ -0,0 +1,131 @@
+from __future__ import annotations
+
+
+ERROR_CATEGORIES = {
+ "timeout",
+ "validation_error",
+ "routing_error",
+ "dependency_blocked",
+ "execution_error",
+ "external_tool_error",
+}
+
+
+def _normalize_text_parts(*parts: str | None, logs: list[str] | None = None) -> str:
+ chunks = [part.strip().lower() for part in parts if part and part.strip()]
+ if logs:
+ chunks.extend(line.strip().lower() for line in logs if line and line.strip())
+ return "\n".join(chunks)
+
+
+def _is_timeout(text: str) -> bool:
+ return "timed out" in text or "timeout" in text or "time out" in text
+
+
+def _is_validation_error(text: str) -> bool:
+ return any(
+ marker in text
+ for marker in (
+ "validation",
+ "must be a non-empty string",
+ "output must be a dict",
+ "invalid input",
+ "invalid output",
+ )
+ )
+
+
+def _is_external_tool_error(text: str) -> bool:
+ return any(
+ marker in text
+ for marker in (
+ "subprocess",
+ "tool call",
+ "external tool",
+ "command failed",
+ "process exited",
+ "exit code",
+ )
+ )
+
+
+def _is_routing_error(text: str) -> bool:
+ return "no eligible agent role found" in text or "routing" in text and "no eligible" in text
+
+
+def classify_run_error(
+ *,
+ run_status: str,
+ error_message: str | None,
+ logs: list[str] | None = None,
+ routing_reason: str | None = None,
+) -> str | None:
+ if run_status != "failed":
+ return None
+
+ normalized = _normalize_text_parts(error_message, routing_reason, logs=logs)
+ if _is_timeout(normalized):
+ return "timeout"
+ if _is_validation_error(normalized):
+ return "validation_error"
+ if _is_external_tool_error(normalized):
+ return "external_tool_error"
+ if _is_routing_error(normalized):
+ return "routing_error"
+ return "execution_error"
+
+
+def classify_task_error(
+ *,
+ task_status: str,
+ dependency_ids: list[str] | None = None,
+ run_status: str | None = None,
+ error_message: str | None = None,
+ logs: list[str] | None = None,
+ routing_reason: str | None = None,
+ review_reason: str | None = None,
+) -> str | None:
+ dependency_ids = dependency_ids or []
+
+ if task_status == "blocked" and dependency_ids:
+ return "dependency_blocked"
+
+ normalized_context = _normalize_text_parts(routing_reason, review_reason)
+ if task_status in {"needs_review", "failed"} and _is_routing_error(normalized_context):
+ return "routing_error"
+
+ if run_status == "failed":
+ return classify_run_error(
+ run_status=run_status,
+ error_message=error_message,
+ logs=logs,
+ routing_reason=routing_reason,
+ )
+
+ return None
+
+
+def summarize_failure_categories(
+ task_items: list[dict[str, object]],
+) -> list[dict[str, object]]:
+ grouped: dict[str, dict[str, object]] = {}
+ for item in task_items:
+ category = item.get("error_category")
+ if not category:
+ continue
+ if category not in grouped:
+ grouped[category] = {
+ "category": category,
+ "count": 0,
+ "task_ids": [],
+ "sample_messages": [],
+ }
+ grouped_item = grouped[category]
+ grouped_item["count"] = int(grouped_item["count"]) + 1
+ grouped_item["task_ids"].append(item["task_id"])
+ message = item.get("error_message") or item.get("cancel_reason") or item.get("routing_reason")
+ if message and message not in grouped_item["sample_messages"] and len(grouped_item["sample_messages"]) < 3:
+ grouped_item["sample_messages"].append(message)
+
+ ordered_categories = sorted(grouped.keys(), key=lambda value: (-int(grouped[value]["count"]), value))
+ return [grouped[category] for category in ordered_categories]
diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py
index 9fd0cbc..c683a07 100644
--- a/src/packages/core/schemas.py
+++ b/src/packages/core/schemas.py
@@ -102,6 +102,7 @@ class BatchTaskResultRead(SchemaModel):
output_snapshot: dict[str, Any] = Field(default_factory=dict)
error_message: str | None = None
cancel_reason: str | None = None
+ error_category: str | None = None
artifact_count: int = 0
@@ -115,6 +116,13 @@ class BatchArtifactRead(SchemaModel):
created_at: datetime
+class FailureCategorySummaryRead(SchemaModel):
+ category: str
+ count: int
+ task_ids: list[str] = Field(default_factory=list)
+ sample_messages: list[str] = Field(default_factory=list)
+
+
class TaskBatchSummaryRead(SchemaModel):
batch: TaskBatchRead
derived_status: str
@@ -122,6 +130,7 @@ class TaskBatchSummaryRead(SchemaModel):
progress: BatchProgressRead
tasks: list[BatchTaskResultRead]
artifacts: list[BatchArtifactRead]
+ failure_categories: list[FailureCategorySummaryRead] = Field(default_factory=list)
class TaskBatchListItemRead(SchemaModel):
@@ -283,6 +292,16 @@ class AgentRegistryListItemRead(SchemaModel):
total_runs: int = 0
success_runs: int = 0
success_rate: float | None = None
+ average_latency_ms: float | None = None
+ retry_rate: float | None = None
+ average_prompt_tokens: float = 0
+ average_completion_tokens: float = 0
+ average_total_tokens: float = 0
+ total_prompt_tokens: int = 0
+ total_completion_tokens: int = 0
+ total_tokens: int = 0
+ average_cost_estimate: float = 0
+ total_cost_estimate: float = 0
class AgentRegistryDiagnosisRead(SchemaModel):
@@ -362,6 +381,8 @@ class RunDetailRead(SchemaModel):
routing: RunRoutingRead
retry_history: list[RunRetryHistoryItemRead] = Field(default_factory=list)
events: list[TaskEventRead] = Field(default_factory=list)
+ cost_estimate: float = 0
+ error_category: str | None = None
class ReviewCheckpointCreate(SchemaModel):
diff --git a/src/tests/test_agent_registry_page.py b/src/tests/test_agent_registry_page.py
index 610c244..e741bec 100644
--- a/src/tests/test_agent_registry_page.py
+++ b/src/tests/test_agent_registry_page.py
@@ -9,6 +9,8 @@
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
+from src.packages.core.db.models import ExecutionRunORM
+
ROOT = Path(__file__).resolve().parents[2]
TEST_ROLE_PREFIX = "registry-test-role-"
@@ -124,49 +126,23 @@ def _create_batch_with_task(client: TestClient, *, task_type: str, suffix: str)
def _insert_runs(task_id: str, agent_role_id: str, run_statuses: list[str]) -> None:
engine = create_engine(_database_url())
with Session(engine) as session:
- for run_status in run_statuses:
- session.execute(
- text(
- """
- INSERT INTO execution_runs (
- id,
- task_id,
- agent_role_id,
- run_status,
- started_at,
- finished_at,
- logs,
- input_snapshot,
- output_snapshot,
- error_message,
- cancelled_at,
- cancel_reason,
- token_usage,
- latency_ms
- ) VALUES (
- :id,
- :task_id,
- :agent_role_id,
- :run_status,
- NOW(),
- NOW(),
- ARRAY[]::text[],
- '{}'::jsonb,
- '{}'::jsonb,
- NULL,
- NULL,
- NULL,
- '{}'::jsonb,
- 100
- )
- """
- ),
- {
- "id": f"run_{uuid.uuid4().hex}",
- "task_id": task_id,
- "agent_role_id": agent_role_id,
- "run_status": run_status,
- },
+ for index, run_status in enumerate(run_statuses, start=1):
+ session.add(
+ ExecutionRunORM(
+ id=f"run_{uuid.uuid4().hex}",
+ task_id=task_id,
+ agent_role_id=agent_role_id,
+ run_status=run_status,
+ logs=[],
+ input_snapshot={},
+ output_snapshot={},
+ token_usage=(
+ {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
+ if index == 1
+ else {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30}
+ ),
+ latency_ms=100 * index,
+ )
)
session.commit()
@@ -264,52 +240,10 @@ def _seed_registry_history(*, suffix: str, run_statuses: list[str]) -> dict:
"assigned_agent_role": role_name,
},
)
- for run_status in run_statuses:
- session.execute(
- text(
- """
- INSERT INTO execution_runs (
- id,
- task_id,
- agent_role_id,
- run_status,
- started_at,
- finished_at,
- logs,
- input_snapshot,
- output_snapshot,
- error_message,
- cancelled_at,
- cancel_reason,
- token_usage,
- latency_ms
- ) VALUES (
- :id,
- :task_id,
- :agent_role_id,
- :run_status,
- NOW(),
- NOW(),
- ARRAY[]::text[],
- '{}'::jsonb,
- '{}'::jsonb,
- NULL,
- NULL,
- NULL,
- '{}'::jsonb,
- 100
- )
- """
- ),
- {
- "id": f"run_{uuid.uuid4().hex}",
- "task_id": task_id,
- "agent_role_id": role_id,
- "run_status": run_status,
- },
- )
session.commit()
+ _insert_runs(task_id, role_id, run_statuses)
+
return role
@@ -342,6 +276,12 @@ def test_agent_registry_aggregates_run_history_and_success_rate() -> None:
assert registry_item["total_runs"] == 3
assert registry_item["success_runs"] == 1
assert registry_item["success_rate"] == 33.33
+ assert registry_item["average_latency_ms"] == 200.0
+ assert registry_item["retry_rate"] == 100.0
+ assert registry_item["total_tokens"] == 75
+ assert registry_item["average_total_tokens"] == 25.0
+ assert registry_item["total_cost_estimate"] == 0.0001
+ assert registry_item["average_cost_estimate"] == 0.000033
def test_agent_registry_diagnosis_distinguishes_enabled_disabled_and_missing_matches() -> None:
@@ -395,6 +335,9 @@ def test_agent_registry_reports_no_run_history_when_role_has_no_runs() -> None:
assert registry_item["total_runs"] == 0
assert registry_item["success_runs"] == 0
assert registry_item["success_rate"] is None
+ assert registry_item["average_latency_ms"] is None
+ assert registry_item["retry_rate"] is None
+ assert registry_item["total_cost_estimate"] == 0
def test_console_agent_registry_page_is_accessible() -> None:
@@ -408,5 +351,8 @@ def test_agent_registry_assets_include_success_rate_and_diagnosis_copy() -> None
response = client.get("/console/assets/agents.js")
assert response.status_code == 200
assert "Success rate" in response.text
+ assert "Avg latency" in response.text
+ assert "Retry rate" in response.text
+ assert "Total cost estimate" in response.text
assert "Why no suitable role?" in client.get("/console/agents").text
assert "No run history" in response.text
diff --git a/src/tests/test_batch_detail_page.py b/src/tests/test_batch_detail_page.py
index 253a1b1..0cd057e 100644
--- a/src/tests/test_batch_detail_page.py
+++ b/src/tests/test_batch_detail_page.py
@@ -101,7 +101,7 @@ def _batch_payload(task_type: str, suffix: str) -> dict:
_cleanup_database()
from src.apps.api.app import app # noqa: E402
-from src.packages.core.db.models import AssignmentORM, ExecutionRunORM, TaskORM # noqa: E402
+from src.packages.core.db.models import AssignmentORM, ExecutionRunORM, ReviewCheckpointORM, TaskORM # noqa: E402
client = TestClient(app)
@@ -152,6 +152,7 @@ def test_batch_detail_page_assets_include_batch_timeline() -> None:
asset_response = client.get("/console/assets/batch-detail.js")
assert asset_response.status_code == 200
assert "/task-batches/${batchId}/timeline" in asset_response.text
+ assert "Error category" in asset_response.text
def test_batch_detail_summary_supports_mixed_risk_sections() -> None:
@@ -188,6 +189,13 @@ def test_batch_detail_summary_supports_mixed_risk_sections() -> None:
output_snapshot={"step": "compile"},
)
)
+ session.add(
+ ReviewCheckpointORM(
+ task_id=third_task.id,
+ reason="No eligible agent role found for task_type=unmatched_type",
+ review_status="pending",
+ )
+ )
session.commit()
summary_response = client.get(f"/task-batches/{batch_id}/summary")
@@ -203,5 +211,13 @@ def test_batch_detail_summary_supports_mixed_risk_sections() -> None:
review_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[2])
assert failed_task["error_message"] == "detail page should highlight this failure"
+ assert failed_task["error_category"] == "execution_error"
assert blocked_task["dependency_ids"] == [task_ids[0]]
+ assert blocked_task["error_category"] == "dependency_blocked"
assert review_task["status"] == "needs_review"
+ assert review_task["error_category"] == "routing_error"
+ assert {item["category"] for item in payload["failure_categories"]} == {
+ "execution_error",
+ "dependency_blocked",
+ "routing_error",
+ }
diff --git a/src/tests/test_error_classification.py b/src/tests/test_error_classification.py
new file mode 100644
index 0000000..e8c279a
--- /dev/null
+++ b/src/tests/test_error_classification.py
@@ -0,0 +1,62 @@
+from src.packages.core.error_classification import classify_run_error, classify_task_error
+
+
+def test_classify_run_error_detects_timeout() -> None:
+ assert (
+ classify_run_error(
+ run_status="failed",
+ error_message="subprocess timed out after 30 seconds",
+ )
+ == "timeout"
+ )
+
+
+def test_classify_run_error_detects_validation_error() -> None:
+ assert (
+ classify_run_error(
+ run_status="failed",
+ error_message="input_payload.text must be a non-empty string",
+ )
+ == "validation_error"
+ )
+
+
+def test_classify_task_error_detects_routing_error_from_review_context() -> None:
+ assert (
+ classify_task_error(
+ task_status="needs_review",
+ review_reason="No eligible agent role found for task_type=no_match",
+ )
+ == "routing_error"
+ )
+
+
+def test_classify_task_error_detects_dependency_blocked() -> None:
+ assert (
+ classify_task_error(
+ task_status="blocked",
+ dependency_ids=["task_upstream"],
+ )
+ == "dependency_blocked"
+ )
+
+
+def test_classify_run_error_detects_external_tool_error() -> None:
+ assert (
+ classify_run_error(
+ run_status="failed",
+ error_message="command failed with exit code 1",
+ logs=["subprocess execution failed"],
+ )
+ == "external_tool_error"
+ )
+
+
+def test_classify_run_error_falls_back_to_execution_error() -> None:
+ assert (
+ classify_run_error(
+ run_status="failed",
+ error_message="unexpected boom",
+ )
+ == "execution_error"
+ )
diff --git a/src/tests/test_run_detail_page.py b/src/tests/test_run_detail_page.py
index fdad5e5..ff66049 100644
--- a/src/tests/test_run_detail_page.py
+++ b/src/tests/test_run_detail_page.py
@@ -183,9 +183,45 @@ def test_run_detail_endpoint_returns_routing_and_retry_history() -> None:
assert [item["run_status"] for item in payload["retry_history"]] == ["success", "failed"]
assert payload["retry_history"][0]["is_current"] is True
assert payload["run"]["token_usage"]["total_tokens"] == 8
+ assert payload["cost_estimate"] == 0.000011
+ assert payload["error_category"] is None
assert payload["events"][-1]["event_type"] == "run_completed"
+def test_run_detail_endpoint_returns_error_category_for_failed_run() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ role_name = f"{TEST_PREFIX}timeout-{suffix}"
+ _register_agent(client, role_name=role_name, supported_task_types=["generate"])
+ response = client.post("/task-batches", json=_batch_payload("generate", suffix))
+ assert response.status_code == 201
+ task_id = response.json()["tasks"][0]["task_id"]
+
+ engine = create_engine(_database_url())
+ with Session(engine) as session:
+ task = session.get(TaskORM, task_id)
+ assert task is not None
+ assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == task.id).first()
+ assert assignment is not None
+ run = ExecutionRunORM(
+ task_id=task.id,
+ agent_role_id=assignment.agent_role_id,
+ run_status="failed",
+ error_message="tool timed out while waiting for response",
+ logs=["command timed out"],
+ input_snapshot={"attempt": 1},
+ output_snapshot={},
+ )
+ session.add(run)
+ session.commit()
+ run_id = run.id
+
+ detail_response = client.get(f"/runs/{run_id}/detail")
+ assert detail_response.status_code == 200
+ payload = detail_response.json()
+ assert payload["run"]["run_status"] == "failed"
+ assert payload["error_category"] == "timeout"
+
+
def test_run_detail_endpoint_handles_cancelled_run_without_logs() -> None:
suffix = uuid.uuid4().hex[:8]
_register_agent(client, role_name="default_worker", supported_task_types=[])
@@ -219,6 +255,7 @@ def test_run_detail_endpoint_handles_cancelled_run_without_logs() -> None:
assert payload["run"]["run_status"] == "cancelled"
assert payload["run"]["cancel_reason"] == "user requested cancellation"
assert payload["run"]["logs"] == []
+ assert payload["error_category"] is None
def test_console_run_detail_page_is_accessible() -> None:
@@ -236,6 +273,8 @@ def test_run_detail_page_assets_include_task_lifecycle_timeline() -> None:
asset_response = client.get("/console/assets/run-detail.js")
assert asset_response.status_code == 200
assert "/tasks/${detail.task.task_id}/timeline" in asset_response.text
+ assert "Cost estimate" in asset_response.text
+ assert "Error category" in asset_response.text
def test_batch_detail_assets_link_to_run_detail_page() -> None:
diff --git a/src/tests/test_task_batch_summary.py b/src/tests/test_task_batch_summary.py
index 70f337a..b1a3786 100644
--- a/src/tests/test_task_batch_summary.py
+++ b/src/tests/test_task_batch_summary.py
@@ -173,6 +173,9 @@ def test_summary_returns_needs_review_status_for_unrouted_batch() -> None:
assert payload["derived_status"] == "needs_review"
assert payload["counts"]["needs_review_count"] == 3
assert payload["progress"]["completed_count"] == 0
+ assert all(task["error_category"] == "routing_error" for task in payload["tasks"])
+ assert payload["failure_categories"][0]["category"] == "routing_error"
+ assert payload["failure_categories"][0]["count"] == 3
def test_summary_returns_running_status_when_task_is_in_progress() -> None:
@@ -275,4 +278,66 @@ def test_summary_aggregates_latest_run_and_artifacts() -> None:
assert task_summary["latest_run_status"] == "success"
assert task_summary["output_snapshot"] == {"version": 2}
assert task_summary["artifact_count"] == 1
+ assert task_summary["error_category"] is None
assert len(payload["artifacts"]) == 1
+
+
+def test_summary_groups_failure_categories_from_latest_task_context() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ _register_agent(client, role_name="default_worker", supported_task_types=[])
+ response = client.post("/task-batches", json=_batch_payload("unmatched_type", suffix))
+ assert response.status_code == 201
+ batch_id = response.json()["batch_id"]
+ task_ids = [task["task_id"] for task in response.json()["tasks"]]
+
+ engine = create_engine(_database_url())
+ with Session(engine) as session:
+ tasks = [session.get(TaskORM, task_id) for task_id in task_ids]
+ assert all(task is not None for task in tasks)
+ first_task, second_task, third_task = tasks
+ first_assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == first_task.id).first()
+ third_assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == third_task.id).first()
+ assert first_assignment is not None
+ assert third_assignment is not None
+
+ first_task.status = "failed"
+ second_task.status = "blocked"
+ third_task.status = "failed"
+
+ session.add(
+ ExecutionRunORM(
+ task_id=first_task.id,
+ agent_role_id=first_assignment.agent_role_id,
+ run_status="failed",
+ error_message="input_payload.text must be a non-empty string",
+ logs=["validation failed"],
+ output_snapshot={},
+ )
+ )
+ session.add(
+ ExecutionRunORM(
+ task_id=third_task.id,
+ agent_role_id=third_assignment.agent_role_id,
+ run_status="failed",
+ error_message="command failed with exit code 1",
+ logs=["subprocess call failed"],
+ output_snapshot={},
+ )
+ )
+ session.commit()
+
+ summary_response = client.get(f"/task-batches/{batch_id}/summary")
+ assert summary_response.status_code == 200
+ payload = summary_response.json()
+
+ validation_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[0])
+ blocked_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[1])
+ tool_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[2])
+ assert validation_task["error_category"] == "validation_error"
+ assert blocked_task["error_category"] == "dependency_blocked"
+ assert tool_task["error_category"] == "external_tool_error"
+
+ category_counts = {item["category"]: item["count"] for item in payload["failure_categories"]}
+ assert category_counts["validation_error"] == 1
+ assert category_counts["dependency_blocked"] == 1
+ assert category_counts["external_tool_error"] == 1