Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/apps/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def console_batches() -> FileResponse:
return FileResponse(WEB_DIR / "index.html")


@app.get("/console/agents")
def console_agents() -> FileResponse:
return FileResponse(WEB_DIR / "agents.html")


@app.get("/console/batches/{batch_id}")
def console_batch_detail(batch_id: str) -> FileResponse:
return FileResponse(WEB_DIR / "batch-detail.html")
Expand Down
123 changes: 121 additions & 2 deletions src/apps/api/routers/agents.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from __future__ import annotations

from typing import Any

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy import case, func, select
from sqlalchemy.orm import Session

from src.apps.api.deps import get_db
from src.packages.core.db.models import AgentRoleORM
from src.packages.core.db.models import AgentRoleORM, ExecutionRunORM
from src.packages.core.schemas import (
AgentCapabilityDeclaration,
AgentRoleDetailRead,
AgentRegistryDiagnosisRead,
AgentRegistryListItemRead,
AgentRegistryResponse,
AgentRoleRegisterRequest,
AgentRoleUpdateRequest,
)
Expand Down Expand Up @@ -42,6 +47,32 @@ def _to_agent_detail(agent_role: AgentRoleORM) -> AgentRoleDetailRead:
)


def _to_agent_registry_item(
agent_role: AgentRoleORM,
*,
total_runs: int,
success_runs: int,
) -> AgentRegistryListItemRead:
success_rate = None
if total_runs > 0:
success_rate = round((success_runs / total_runs) * 100, 2)

return AgentRegistryListItemRead(
id=agent_role.id,
role_name=agent_role.role_name,
description=agent_role.description,
capabilities=agent_role.capabilities,
capability_declaration=_build_capability_declaration(agent_role),
input_schema=agent_role.input_schema,
output_schema=agent_role.output_schema,
enabled=agent_role.enabled,
version=agent_role.version,
total_runs=total_runs,
success_runs=success_runs,
success_rate=success_rate,
)


def _merge_input_schema(
base_schema: dict,
capability_declaration: AgentCapabilityDeclaration,
Expand All @@ -63,6 +94,50 @@ def _merge_output_schema(
return merged


def _supported_task_types(agent_role: AgentRoleORM) -> list[str]:
supported = agent_role.input_schema.get("supported_task_types", [])
if isinstance(supported, list):
return [str(item) for item in supported]
return []


def _build_registry_diagnosis(
agent_roles: list[AgentRoleORM],
task_type: str,
) -> AgentRegistryDiagnosisRead:
matching_enabled_roles: list[str] = []
matching_disabled_roles: list[str] = []

for agent_role in agent_roles:
if task_type not in _supported_task_types(agent_role):
continue
if agent_role.enabled:
matching_enabled_roles.append(agent_role.role_name)
else:
matching_disabled_roles.append(agent_role.role_name)

if matching_enabled_roles:
status_name = "matched_enabled"
message = f"Found {len(matching_enabled_roles)} enabled role(s) for task_type={task_type}."
elif matching_disabled_roles:
status_name = "matched_disabled_only"
message = (
f"No enabled role can execute task_type={task_type}; "
"matching roles exist but are disabled."
)
else:
status_name = "no_match"
message = f"No agent role declares support for task_type={task_type}."

return AgentRegistryDiagnosisRead(
task_type=task_type,
status=status_name,
message=message,
matching_enabled_roles=matching_enabled_roles,
matching_disabled_roles=matching_disabled_roles,
)


@router.post("/register", response_model=AgentRoleDetailRead, status_code=status.HTTP_201_CREATED)
def register_agent(
payload: AgentRoleRegisterRequest,
Expand Down Expand Up @@ -100,6 +175,50 @@ def list_agents(db: Session = Depends(get_db)) -> list[AgentRoleDetailRead]:
return [_to_agent_detail(agent_role) for agent_role in agent_roles]


@router.get("/registry", response_model=AgentRegistryResponse)
def get_agent_registry(
task_type: str | None = None,
db: Session = Depends(get_db),
) -> AgentRegistryResponse:
agent_roles = db.scalars(select(AgentRoleORM).order_by(AgentRoleORM.role_name)).all()

run_stats_rows = db.execute(
select(
ExecutionRunORM.agent_role_id,
func.count(ExecutionRunORM.id).label("total_runs"),
func.coalesce(
func.sum(case((ExecutionRunORM.run_status == "success", 1), else_=0)),
0,
).label("success_runs"),
)
.group_by(ExecutionRunORM.agent_role_id)
).all()

run_stats_by_role_id: dict[str, dict[str, Any]] = {
row.agent_role_id: {
"total_runs": int(row.total_runs or 0),
"success_runs": int(row.success_runs or 0),
}
for row in run_stats_rows
}

items = [
_to_agent_registry_item(
agent_role,
total_runs=run_stats_by_role_id.get(agent_role.id, {}).get("total_runs", 0),
success_runs=run_stats_by_role_id.get(agent_role.id, {}).get("success_runs", 0),
)
for agent_role in agent_roles
]

diagnosis = None
normalized_task_type = task_type.strip() if task_type else ""
if normalized_task_type:
diagnosis = _build_registry_diagnosis(agent_roles, normalized_task_type)

return AgentRegistryResponse(items=items, diagnosis=diagnosis)


@router.get("/{agent_id}", response_model=AgentRoleDetailRead)
def get_agent(agent_id: str, db: Session = Depends(get_db)) -> AgentRoleDetailRead:
agent_role = db.get(AgentRoleORM, agent_id)
Expand Down
10 changes: 10 additions & 0 deletions src/apps/api/routers/task_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TaskORM,
)
from src.packages.core.schemas import (
BatchTimelineRead,
BatchArtifactRead,
BatchCountsRead,
BatchProgressRead,
Expand All @@ -32,6 +33,7 @@
TaskBatchSubmitResponse,
TaskBatchSubmitTaskRead,
)
from src.packages.core.timeline import load_batch_timeline
from src.packages.core.task_state_machine import transition_task_status
from src.packages.router import route_task

Expand Down Expand Up @@ -430,6 +432,14 @@ def get_task_batch(batch_id: str, db: Session = Depends(get_db)) -> TaskBatchRea
return TaskBatchRead.model_validate(task_batch)


@router.get("/{batch_id}/timeline", response_model=BatchTimelineRead)
def get_task_batch_timeline(batch_id: str, db: Session = Depends(get_db)) -> BatchTimelineRead:
timeline = load_batch_timeline(db, batch_id)
if timeline is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task batch not found")
return timeline


@router.get("/{batch_id}/summary", response_model=TaskBatchSummaryRead)
def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> TaskBatchSummaryRead:
task_batch = db.get(TaskBatchORM, batch_id)
Expand Down
44 changes: 43 additions & 1 deletion src/apps/api/routers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@

from src.apps.api.deps import get_db
from src.packages.core.db.models import EventLogORM, TaskORM
from src.packages.core.schemas import TaskCancelRequest, TaskEventRead, TaskRead
from src.packages.core.schemas import (
TaskCancelRequest,
TaskEventRead,
TaskRead,
TaskTimelineRead,
TaskStatusHistoryItemRead,
)
from src.packages.core.timeline import load_task_timeline
from src.packages.core.task_state_machine import TaskStatusTransitionError, transition_task_status

router = APIRouter(prefix="/tasks", tags=["tasks"])
Expand Down Expand Up @@ -40,6 +47,41 @@ def get_task_events(task_id: str, db: Session = Depends(get_db)) -> list[TaskEve
return [TaskEventRead.model_validate(event) for event in events]


@router.get("/{task_id}/status-history", response_model=list[TaskStatusHistoryItemRead])
def get_task_status_history(task_id: str, db: Session = Depends(get_db)) -> list[TaskStatusHistoryItemRead]:
task = db.get(TaskORM, task_id)
if task is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")

events = db.scalars(
select(EventLogORM)
.where(
EventLogORM.task_id == task_id,
EventLogORM.event_type == "task_status_changed",
)
.order_by(EventLogORM.created_at.asc(), EventLogORM.id.asc())
).all()
return [
TaskStatusHistoryItemRead(
task_id=event.task_id or task_id,
old_status=event.payload.get("from_status"),
new_status=event.payload.get("to_status") or event.event_status or "unknown",
timestamp=event.created_at,
reason=event.message,
actor=event.payload.get("source"),
)
for event in events
]


@router.get("/{task_id}/timeline", response_model=TaskTimelineRead)
def get_task_timeline(task_id: str, db: Session = Depends(get_db)) -> TaskTimelineRead:
timeline = load_task_timeline(db, task_id)
if timeline is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return timeline


@router.post("/{task_id}/cancel", response_model=TaskRead)
def cancel_task(
task_id: str,
Expand Down
Loading
Loading