Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 68 additions & 23 deletions src/apps/api/routers/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from typing import Any

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import case, func, select
from sqlalchemy import select
from sqlalchemy.orm import Session

from src.apps.api.deps import get_db
from src.packages.core.db.models import AgentRoleORM, ExecutionRunORM
from src.packages.core.costs import estimate_cost
from src.packages.core.schemas import (
AgentCapabilityDeclaration,
AgentRoleDetailRead,
Expand Down Expand Up @@ -50,9 +51,10 @@ def _to_agent_detail(agent_role: AgentRoleORM) -> AgentRoleDetailRead:
def _to_agent_registry_item(
agent_role: AgentRoleORM,
*,
total_runs: int,
success_runs: int,
stats: dict[str, Any],
) -> AgentRegistryListItemRead:
total_runs = int(stats.get("total_runs", 0))
success_runs = int(stats.get("success_runs", 0))
success_rate = None
if total_runs > 0:
success_rate = round((success_runs / total_runs) * 100, 2)
Expand All @@ -70,6 +72,16 @@ def _to_agent_registry_item(
total_runs=total_runs,
success_runs=success_runs,
success_rate=success_rate,
average_latency_ms=stats.get("average_latency_ms"),
retry_rate=stats.get("retry_rate"),
average_prompt_tokens=stats.get("average_prompt_tokens", 0),
average_completion_tokens=stats.get("average_completion_tokens", 0),
average_total_tokens=stats.get("average_total_tokens", 0),
total_prompt_tokens=stats.get("total_prompt_tokens", 0),
total_completion_tokens=stats.get("total_completion_tokens", 0),
total_tokens=stats.get("total_tokens", 0),
average_cost_estimate=stats.get("average_cost_estimate", 0),
total_cost_estimate=stats.get("total_cost_estimate", 0),
)


Expand Down Expand Up @@ -182,31 +194,64 @@ def get_agent_registry(
) -> AgentRegistryResponse:
agent_roles = db.scalars(select(AgentRoleORM).order_by(AgentRoleORM.role_name)).all()

run_stats_rows = db.execute(
select(
ExecutionRunORM.agent_role_id,
func.count(ExecutionRunORM.id).label("total_runs"),
func.coalesce(
func.sum(case((ExecutionRunORM.run_status == "success", 1), else_=0)),
0,
).label("success_runs"),
runs = db.scalars(select(ExecutionRunORM).order_by(ExecutionRunORM.started_at.asc(), ExecutionRunORM.id.asc())).all()
run_stats_by_role_id: dict[str, dict[str, Any]] = {}
task_run_counts: dict[str, int] = {}
for run in runs:
task_run_counts[run.task_id] = task_run_counts.get(run.task_id, 0) + 1

for run in runs:
stats = run_stats_by_role_id.setdefault(
run.agent_role_id,
{
"total_runs": 0,
"success_runs": 0,
"latency_sum": 0,
"latency_count": 0,
"total_prompt_tokens": 0,
"total_completion_tokens": 0,
"total_tokens": 0,
"total_cost_estimate": 0.0,
"retry_runs": 0,
},
)
.group_by(ExecutionRunORM.agent_role_id)
).all()

run_stats_by_role_id: dict[str, dict[str, Any]] = {
row.agent_role_id: {
"total_runs": int(row.total_runs or 0),
"success_runs": int(row.success_runs or 0),
}
for row in run_stats_rows
}
stats["total_runs"] += 1
if run.run_status == "success":
stats["success_runs"] += 1
if run.latency_ms is not None:
stats["latency_sum"] += int(run.latency_ms)
stats["latency_count"] += 1
prompt_tokens = int(run.token_usage.get("prompt_tokens", 0) or 0)
completion_tokens = int(run.token_usage.get("completion_tokens", 0) or 0)
total_tokens = int(run.token_usage.get("total_tokens", 0) or 0)
if total_tokens == 0:
total_tokens = prompt_tokens + completion_tokens
stats["total_prompt_tokens"] += prompt_tokens
stats["total_completion_tokens"] += completion_tokens
stats["total_tokens"] += total_tokens
stats["total_cost_estimate"] += estimate_cost(run.token_usage)
if task_run_counts.get(run.task_id, 0) > 1:
stats["retry_runs"] += 1

for stats in run_stats_by_role_id.values():
total_runs = stats["total_runs"]
latency_count = stats["latency_count"]
stats["average_latency_ms"] = (
round(stats["latency_sum"] / latency_count, 2) if latency_count else None
)
stats["retry_rate"] = round((stats["retry_runs"] / total_runs) * 100, 2) if total_runs else None
stats["average_prompt_tokens"] = round(stats["total_prompt_tokens"] / total_runs, 2) if total_runs else 0
stats["average_completion_tokens"] = (
round(stats["total_completion_tokens"] / total_runs, 2) if total_runs else 0
)
stats["average_total_tokens"] = round(stats["total_tokens"] / total_runs, 2) if total_runs else 0
stats["total_cost_estimate"] = round(stats["total_cost_estimate"], 6)
stats["average_cost_estimate"] = round(stats["total_cost_estimate"] / total_runs, 6) if total_runs else 0

items = [
_to_agent_registry_item(
agent_role,
total_runs=run_stats_by_role_id.get(agent_role.id, {}).get("total_runs", 0),
success_runs=run_stats_by_role_id.get(agent_role.id, {}).get("success_runs", 0),
stats=run_stats_by_role_id.get(agent_role.id, {}),
)
for agent_role in agent_roles
]
Expand Down
9 changes: 9 additions & 0 deletions src/apps/api/routers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from sqlalchemy.orm import Session

from src.apps.api.deps import get_db
from src.packages.core.costs import estimate_cost
from src.packages.core.db.models import AgentRoleORM, AssignmentORM, EventLogORM, ExecutionRunORM, TaskORM
from src.packages.core.error_classification import classify_run_error
from src.packages.core.schemas import (
ExecutionRunRead,
RunDetailRead,
Expand Down Expand Up @@ -89,6 +91,13 @@ def get_run_detail(run_id: str, db: Session = Depends(get_db)) -> RunDetailRead:
for item in runs
],
events=[TaskEventRead.model_validate(event) for event in events],
cost_estimate=estimate_cost(run.token_usage),
error_category=classify_run_error(
run_status=run.run_status,
error_message=run.error_message,
logs=run.logs,
routing_reason=assignment.routing_reason if assignment is not None else None,
),
)


Expand Down
62 changes: 62 additions & 0 deletions src/apps/api/routers/task_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
TaskBatchORM,
TaskORM,
)
from src.packages.core.error_classification import classify_task_error, summarize_failure_categories
from src.packages.core.schemas import (
BatchTimelineRead,
BatchArtifactRead,
BatchCountsRead,
FailureCategorySummaryRead,
BatchProgressRead,
BatchTaskResultRead,
TaskBatchListItemRead,
Expand Down Expand Up @@ -142,6 +144,36 @@ def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[Artifa
return artifacts, artifact_counts


def _load_active_assignments(task_ids: list[str], db: Session) -> dict[str, AssignmentORM]:
if not task_ids:
return {}

assignments = db.scalars(
select(AssignmentORM)
.where(AssignmentORM.task_id.in_(task_ids))
.order_by(AssignmentORM.assigned_at.desc(), AssignmentORM.id.desc())
).all()
latest_assignments: dict[str, AssignmentORM] = {}
for assignment in assignments:
latest_assignments.setdefault(assignment.task_id, assignment)
return latest_assignments


def _load_latest_reviews(task_ids: list[str], db: Session) -> dict[str, ReviewCheckpointORM]:
if not task_ids:
return {}

reviews = db.scalars(
select(ReviewCheckpointORM)
.where(ReviewCheckpointORM.task_id.in_(task_ids))
.order_by(ReviewCheckpointORM.created_at.desc(), ReviewCheckpointORM.id.desc())
).all()
latest_reviews: dict[str, ReviewCheckpointORM] = {}
for review in reviews:
latest_reviews.setdefault(review.task_id, review)
return latest_reviews


def _batch_updated_at(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> datetime:
if not tasks:
return task_batch.created_at
Expand Down Expand Up @@ -454,6 +486,8 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
task_ids = [task.id for task in tasks]
latest_runs = _load_latest_runs(task_ids, db)
artifacts, artifact_counts = _load_batch_artifacts(task_ids, db)
latest_assignments = _load_active_assignments(task_ids, db)
latest_reviews = _load_latest_reviews(task_ids, db)
counts = _build_batch_counts(tasks)
progress = _build_batch_progress(tasks)
derived_status = _derive_batch_status(tasks)
Expand All @@ -471,6 +505,15 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
output_snapshot=latest_runs.get(task.id).output_snapshot if latest_runs.get(task.id) is not None else {},
error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None,
cancel_reason=latest_runs.get(task.id).cancel_reason if latest_runs.get(task.id) is not None else None,
error_category=classify_task_error(
task_status=task.status,
dependency_ids=task.dependency_ids,
run_status=latest_runs.get(task.id).run_status if latest_runs.get(task.id) is not None else None,
error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None,
logs=latest_runs.get(task.id).logs if latest_runs.get(task.id) is not None else None,
routing_reason=latest_assignments.get(task.id).routing_reason if latest_assignments.get(task.id) is not None else None,
review_reason=latest_reviews.get(task.id).reason if latest_reviews.get(task.id) is not None else None,
),
artifact_count=artifact_counts.get(task.id, 0),
)
for task in tasks
Expand All @@ -496,4 +539,23 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task
progress=progress,
tasks=task_results,
artifacts=artifact_reads,
failure_categories=[
FailureCategorySummaryRead.model_validate(item)
for item in summarize_failure_categories(
[
{
"task_id": task.task_id,
"error_category": task.error_category,
"error_message": task.error_message,
"cancel_reason": task.cancel_reason,
"routing_reason": (
latest_assignments.get(task.task_id).routing_reason
if latest_assignments.get(task.task_id) is not None
else None
),
}
for task in task_results
]
)
],
)
16 changes: 16 additions & 0 deletions src/apps/web/agents.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ function formatPercent(value) {
return `${value}%`;
}

function formatCurrency(value) {
return `$${Number(value ?? 0).toFixed(6)}`;
}

function formatJson(value) {
const entries = value && typeof value === "object" ? Object.keys(value) : [];
if (!entries.length) {
Expand Down Expand Up @@ -127,6 +131,18 @@ function renderAgents(items) {
<div><dt>Success rate</dt><dd>${escapeHtml(formatPercent(item.success_rate))}</dd></div>
</dl>

<dl class="metrics compact">
<div><dt>Avg latency</dt><dd>${escapeHtml(item.average_latency_ms ?? "n/a")} ms</dd></div>
<div><dt>Retry rate</dt><dd>${escapeHtml(formatPercent(item.retry_rate))}</dd></div>
<div><dt>Total tokens</dt><dd>${escapeHtml(item.total_tokens)}</dd></div>
</dl>

<dl class="metrics compact">
<div><dt>Avg total tokens</dt><dd>${escapeHtml(item.average_total_tokens)}</dd></div>
<div><dt>Avg cost estimate</dt><dd>${escapeHtml(formatCurrency(item.average_cost_estimate))}</dd></div>
<div><dt>Total cost estimate</dt><dd>${escapeHtml(formatCurrency(item.total_cost_estimate))}</dd></div>
</dl>

<section class="detail-block">
<strong>Input schema</strong>
<pre class="schema-block">${formatJson(item.input_schema)}</pre>
Expand Down
22 changes: 19 additions & 3 deletions src/apps/web/batch-detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ function renderOverview(summary) {
.join("");
}

function renderRiskGroups(tasks) {
function renderRiskGroups(summary) {
const tasks = summary.tasks;
const groups = [
{
key: "failed",
Expand All @@ -77,7 +78,14 @@ function renderRiskGroups(tasks) {
},
];

const activeGroups = groups.filter((group) => group.items.length > 0);
const categoryGroups = (summary.failure_categories ?? []).map((item) => ({
key: item.category,
label: `Error category · ${item.category}`,
items: tasks.filter((task) => task.error_category === item.category),
description: `${item.count} task${item.count === 1 ? "" : "s"} currently grouped here.`,
}));

const activeGroups = [...groups.filter((group) => group.items.length > 0), ...categoryGroups];
if (!activeGroups.length) {
riskGroups.innerHTML = `<div class="empty-panel">No failed, blocked, or review-pending tasks in this batch.</div>`;
return;
Expand Down Expand Up @@ -201,6 +209,9 @@ function taskFlags(task) {
if (task.status === "failed" && task.error_message) {
flags.push("Latest run returned an error");
}
if (task.error_category) {
flags.push(`Error category: ${task.error_category}`);
}
return flags;
}

Expand Down Expand Up @@ -235,6 +246,11 @@ function renderTasks(tasks) {
<span class="meta-pill">agent ${escapeHtml(task.assigned_agent_role ?? "unassigned")}</span>
<span class="meta-pill">latest run ${escapeHtml(task.latest_run_status ?? "not started")}</span>
<span class="meta-pill">${escapeHtml(task.artifact_count)} artifacts</span>
${
task.error_category
? `<span class="meta-pill">error ${escapeHtml(task.error_category)}</span>`
: ""
}
</div>
${
task.latest_run_id
Expand Down Expand Up @@ -317,7 +333,7 @@ async function loadBatchDetail() {
]);
statusText.textContent = `Batch ${summary.batch.id} is currently ${summary.derived_status}.`;
renderOverview(summary);
renderRiskGroups(summary.tasks);
renderRiskGroups(summary);
renderDependencyMap(summary.tasks);
renderArtifacts(summary.artifacts);
renderTimeline(timeline.items);
Expand Down
7 changes: 7 additions & 0 deletions src/apps/web/run-detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ function renderOverview(detail) {
["Prompt tokens", detail.run.token_usage?.prompt_tokens ?? 0],
["Completion tokens", detail.run.token_usage?.completion_tokens ?? 0],
["Total tokens", detail.run.token_usage?.total_tokens ?? 0],
["Cost estimate", `$${Number(detail.cost_estimate ?? 0).toFixed(6)}`],
["Error category", detail.error_category ?? "n/a"],
];
overviewMetrics.innerHTML = metrics
.map(([label, value]) => `<div><dt>${label}</dt><dd>${escapeHtml(value)}</dd></div>`)
Expand Down Expand Up @@ -101,6 +103,11 @@ function renderErrorAndLogs(detail) {
<article class="error-entry ${errorClass}">
<strong>${detail.run.run_status === "cancelled" ? "Cancel context" : "Error message"}</strong>
<p>${escapeHtml(detail.run.error_message ?? detail.run.cancel_reason)}</p>
${
detail.error_category
? `<p class="muted">Error category: ${escapeHtml(detail.error_category)}</p>`
: ""
}
</article>
`;
} else {
Expand Down
13 changes: 13 additions & 0 deletions src/packages/core/costs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations

PROMPT_COST_PER_1K = 0.001
COMPLETION_COST_PER_1K = 0.002


def estimate_cost(token_usage: dict | None) -> float:
usage = token_usage or {}
prompt_tokens = int(usage.get("prompt_tokens", 0) or 0)
completion_tokens = int(usage.get("completion_tokens", 0) or 0)
cost = (prompt_tokens / 1000) * PROMPT_COST_PER_1K
cost += (completion_tokens / 1000) * COMPLETION_COST_PER_1K
return round(cost, 6)
Loading
Loading