diff --git a/src/apps/web/run-detail.js b/src/apps/web/run-detail.js
index 784373f..86896df 100644
--- a/src/apps/web/run-detail.js
+++ b/src/apps/web/run-detail.js
@@ -10,6 +10,7 @@ const outputSnapshot = document.getElementById("output-snapshot");
const errorPanel = document.getElementById("error-panel");
const logList = document.getElementById("log-list");
const retryHistory = document.getElementById("retry-history");
+const taskTimeline = document.getElementById("task-timeline");
const eventList = document.getElementById("event-list");
const backToBatch = document.getElementById("back-to-batch");
@@ -144,6 +145,29 @@ function renderRetryHistory(detail) {
.join("");
}
+function renderTaskTimeline(timeline) {
+ if (!timeline.items.length) {
+ taskTimeline.innerHTML = `
No lifecycle timeline available.`;
+ return;
+ }
+
+ taskTimeline.innerHTML = timeline.items
+ .map(
+ (item) => `
+
+ ${escapeHtml(item.title)}
+ ${escapeHtml(item.detail ?? "No additional detail.")}
+
+ ${escapeHtml(item.stage)} · ${escapeHtml(formatDate(item.timestamp))}
+ · run ${escapeHtml(item.run_id ?? "n/a")}
+ · actor ${escapeHtml(item.actor ?? "system")}
+
+
+ `,
+ )
+ .join("");
+}
+
function renderEvents(detail) {
if (!detail.events.length) {
eventList.innerHTML = `
No task events available.`;
@@ -171,6 +195,7 @@ function renderError(message) {
errorPanel.innerHTML = `
${escapeHtml(message)}
`;
logList.innerHTML = `
${escapeHtml(message)}
`;
retryHistory.innerHTML = `
${escapeHtml(message)}
`;
+ taskTimeline.innerHTML = `
${escapeHtml(message)}`;
eventList.innerHTML = `
${escapeHtml(message)}`;
}
@@ -182,20 +207,26 @@ async function loadRunDetail() {
}
try {
- const response = await fetch(`/runs/${runId}/detail`);
- if (response.status === 404) {
+ const detailResponse = await fetch(`/runs/${runId}/detail`);
+ if (detailResponse.status === 404) {
throw new Error("Run not found.");
}
- if (!response.ok) {
- throw new Error(`Request failed with status ${response.status}`);
+ if (!detailResponse.ok) {
+ throw new Error(`Request failed with status ${detailResponse.status}`);
}
- const detail = await response.json();
+ const detail = await detailResponse.json();
+ const timelineResponse = await fetch(`/tasks/${detail.task.task_id}/timeline`);
+ if (!timelineResponse.ok) {
+ throw new Error(`Timeline request failed with status ${timelineResponse.status}`);
+ }
+ const timeline = await timelineResponse.json();
renderOverview(detail);
renderRouting(detail);
renderSnapshots(detail);
renderErrorAndLogs(detail);
renderRetryHistory(detail);
+ renderTaskTimeline(timeline);
renderEvents(detail);
} catch (error) {
renderError(error.message);
diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py
index 051e2e1..9fd0cbc 100644
--- a/src/packages/core/schemas.py
+++ b/src/packages/core/schemas.py
@@ -175,6 +175,38 @@ class TaskEventRead(SchemaModel):
created_at: datetime
+class TaskStatusHistoryItemRead(SchemaModel):
+ task_id: str
+ old_status: str | None = None
+ new_status: str
+ timestamp: datetime
+ reason: str | None = None
+ actor: str | None = None
+
+
+class TimelineItemRead(SchemaModel):
+ timestamp: datetime
+ stage: str
+ title: str
+ detail: str | None = None
+ task_id: str | None = None
+ run_id: str | None = None
+ status: str | None = None
+ actor: str | None = None
+
+
+class TaskTimelineRead(SchemaModel):
+ task_id: str
+ batch_id: str
+ items: list[TimelineItemRead] = Field(default_factory=list)
+
+
+class BatchTimelineRead(SchemaModel):
+ batch_id: str
+ title: str
+ items: list[TimelineItemRead] = Field(default_factory=list)
+
+
class AgentRoleCreate(SchemaModel):
role_name: str
description: str | None = None
@@ -238,6 +270,34 @@ class AgentRoleDetailRead(SchemaModel):
version: str
+class AgentRegistryListItemRead(SchemaModel):
+ id: str
+ role_name: str
+ description: str | None = None
+ capabilities: list[str] = Field(default_factory=list)
+ capability_declaration: AgentCapabilityDeclaration
+ input_schema: dict[str, Any] = Field(default_factory=dict)
+ output_schema: dict[str, Any] = Field(default_factory=dict)
+ enabled: bool
+ version: str
+ total_runs: int = 0
+ success_runs: int = 0
+ success_rate: float | None = None
+
+
+class AgentRegistryDiagnosisRead(SchemaModel):
+ task_type: str
+ status: str
+ message: str
+ matching_enabled_roles: list[str] = Field(default_factory=list)
+ matching_disabled_roles: list[str] = Field(default_factory=list)
+
+
+class AgentRegistryResponse(SchemaModel):
+ items: list[AgentRegistryListItemRead] = Field(default_factory=list)
+ diagnosis: AgentRegistryDiagnosisRead | None = None
+
+
class AssignmentCreate(SchemaModel):
task_id: str
agent_role_id: str
diff --git a/src/packages/core/timeline.py b/src/packages/core/timeline.py
new file mode 100644
index 0000000..860e58d
--- /dev/null
+++ b/src/packages/core/timeline.py
@@ -0,0 +1,247 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from src.packages.core.db.models import EventLogORM, TaskBatchORM, TaskORM
+from src.packages.core.schemas import BatchTimelineRead, TaskTimelineRead, TimelineItemRead
+
+
+@dataclass(frozen=True)
+class _SortableTimelineItem:
+ item: TimelineItemRead
+ event_id: str
+ sequence: int
+
+
+def _timeline_item(
+ *,
+ timestamp,
+ stage: str,
+ title: str,
+ detail: str | None,
+ task_id: str | None,
+ run_id: str | None,
+ status: str | None,
+ actor: str | None,
+) -> TimelineItemRead:
+ return TimelineItemRead(
+ timestamp=timestamp,
+ stage=stage,
+ title=title,
+ detail=detail,
+ task_id=task_id,
+ run_id=run_id,
+ status=status,
+ actor=actor,
+ )
+
+
+def _status_stage(to_status: str) -> tuple[str, str]:
+ mapping = {
+ "queued": ("queued", "Task queued"),
+ "blocked": ("blocked", "Task blocked"),
+ "running": ("running", "Task started"),
+ "success": ("completed", "Task completed"),
+ "failed": ("failed", "Task failed"),
+ "cancelled": ("cancelled", "Task cancelled"),
+ "needs_review": ("review", "Task sent to review"),
+ }
+ return mapping.get(to_status, ("status", f"Task moved to {to_status}"))
+
+
+def build_task_timeline(task: TaskORM, events: list[EventLogORM]) -> TaskTimelineRead:
+ sortable_items: list[_SortableTimelineItem] = [
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=task.created_at,
+ stage="created",
+ title="Task created",
+ detail=task.description,
+ task_id=task.id,
+ run_id=None,
+ status="pending",
+ actor="system",
+ ),
+ event_id=f"created-{task.id}",
+ sequence=0,
+ )
+ ]
+
+ for event in events:
+ payload = event.payload or {}
+ task_id = event.task_id or task.id
+ actor = payload.get("source")
+ detail = event.message
+ sequence = 0
+
+ if event.event_type == "task_status_changed":
+ from_status = payload.get("from_status")
+ to_status = payload.get("to_status") or event.event_status or "unknown"
+
+ if actor == "router":
+ sortable_items.append(
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=event.created_at,
+ stage="routed",
+ title="Task routed",
+ detail=detail,
+ task_id=task_id,
+ run_id=event.run_id,
+ status=to_status,
+ actor=actor,
+ ),
+ event_id=event.id,
+ sequence=sequence,
+ )
+ )
+ sequence += 1
+
+ if from_status == "failed" and to_status == "queued":
+ sortable_items.append(
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=event.created_at,
+ stage="retry",
+ title="Retry requested",
+ detail=detail,
+ task_id=task_id,
+ run_id=event.run_id,
+ status=to_status,
+ actor=actor,
+ ),
+ event_id=event.id,
+ sequence=sequence,
+ )
+ )
+ sequence += 1
+
+ stage, title = _status_stage(to_status)
+ sortable_items.append(
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=event.created_at,
+ stage=stage,
+ title=title,
+ detail=detail,
+ task_id=task_id,
+ run_id=event.run_id,
+ status=to_status,
+ actor=actor,
+ ),
+ event_id=event.id,
+ sequence=sequence,
+ )
+ )
+ continue
+
+ stage_title_mapping = {
+ "review_checkpoint_created": ("review", "Approval requested"),
+ "review_approved": ("review", "Review approved"),
+ "review_rejected": ("review", "Review rejected"),
+ "task_review_resolved": ("review", "Review resolved"),
+ "execution_run_started": ("running", "Execution started"),
+ "execution_run_finished": (
+ "completed" if event.event_status == "success" else "failed",
+ "Execution finished" if event.event_status == "success" else "Execution failed",
+ ),
+ "execution_run_cancelled": ("cancelled", "Execution cancelled"),
+ "task_cancellation_requested": ("cancelled", "Cancellation requested"),
+ "task_cancellation_completed": ("cancelled", "Cancellation completed"),
+ "task_unblocked": ("queued", "Dependencies resolved"),
+ }
+ if event.event_type not in stage_title_mapping:
+ continue
+ stage, title = stage_title_mapping[event.event_type]
+ sortable_items.append(
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=event.created_at,
+ stage=stage,
+ title=title,
+ detail=detail,
+ task_id=task_id,
+ run_id=event.run_id,
+ status=event.event_status,
+ actor=actor,
+ ),
+ event_id=event.id,
+ sequence=0,
+ )
+ )
+
+ items = [
+ sortable.item
+ for sortable in sorted(
+ sortable_items,
+ key=lambda entry: (entry.item.timestamp, entry.event_id, entry.sequence),
+ )
+ ]
+ return TaskTimelineRead(task_id=task.id, batch_id=task.batch_id, items=items)
+
+
+def load_task_timeline(db: Session, task_id: str) -> TaskTimelineRead | None:
+ task = db.get(TaskORM, task_id)
+ if task is None:
+ return None
+
+ events = db.scalars(
+ select(EventLogORM)
+ .where(EventLogORM.task_id == task.id)
+ .order_by(EventLogORM.created_at.asc(), EventLogORM.id.asc())
+ ).all()
+ return build_task_timeline(task, events)
+
+
+def load_batch_timeline(db: Session, batch_id: str) -> BatchTimelineRead | None:
+ batch = db.get(TaskBatchORM, batch_id)
+ if batch is None:
+ return None
+
+ tasks = db.scalars(
+ select(TaskORM)
+ .where(TaskORM.batch_id == batch.id)
+ .order_by(TaskORM.created_at.asc(), TaskORM.id.asc())
+ ).all()
+
+ sortable_items: list[_SortableTimelineItem] = [
+ _SortableTimelineItem(
+ item=_timeline_item(
+ timestamp=batch.created_at,
+ stage="created",
+ title="Batch created",
+ detail=batch.description,
+ task_id=None,
+ run_id=None,
+ status=batch.status,
+ actor=batch.created_by,
+ ),
+ event_id=f"created-{batch.id}",
+ sequence=0,
+ )
+ ]
+
+ for task in tasks:
+ task_timeline = load_task_timeline(db, task.id)
+ if task_timeline is None:
+ continue
+ for index, item in enumerate(task_timeline.items):
+ sortable_items.append(
+ _SortableTimelineItem(
+ item=item,
+ event_id=f"{task.id}-{index}",
+ sequence=index,
+ )
+ )
+
+ items = [
+ sortable.item
+ for sortable in sorted(
+ sortable_items,
+ key=lambda entry: (entry.item.timestamp, entry.event_id, entry.sequence),
+ )
+ ]
+ return BatchTimelineRead(batch_id=batch.id, title=batch.title, items=items)
diff --git a/src/tests/test_agent_registry_page.py b/src/tests/test_agent_registry_page.py
new file mode 100644
index 0000000..610c244
--- /dev/null
+++ b/src/tests/test_agent_registry_page.py
@@ -0,0 +1,412 @@
+from __future__ import annotations
+
+import os
+import sys
+import uuid
+from pathlib import Path
+
+from fastapi.testclient import TestClient
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+
+ROOT = Path(__file__).resolve().parents[2]
+TEST_ROLE_PREFIX = "registry-test-role-"
+TEST_BATCH_PREFIX = "registry-test-batch-"
+
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+
+def _database_url() -> str:
+ database_url = os.getenv("DATABASE_URL")
+ if not database_url:
+ raise RuntimeError("DATABASE_URL is not set")
+ return database_url
+
+
+def _cleanup_database() -> None:
+ engine = create_engine(_database_url())
+ with engine.begin() as conn:
+ conn.execute(
+ text(
+ """
+ DELETE FROM task_batches
+ WHERE title LIKE :batch_prefix
+ """
+ ),
+ {"batch_prefix": f"{TEST_BATCH_PREFIX}%"},
+ )
+ conn.execute(
+ text(
+ """
+ DELETE FROM agent_roles
+ WHERE role_name LIKE :role_prefix
+ """
+ ),
+ {"role_prefix": f"{TEST_ROLE_PREFIX}%"},
+ )
+
+
+def _register_agent(
+ client: TestClient,
+ *,
+ role_name: str,
+ supported_task_types: list[str],
+ enabled: bool = True,
+) -> dict:
+ payload = {
+ "role_name": role_name,
+ "description": "registry test role",
+ "capabilities": [f"task:{role_name}", "task:registry"],
+ "capability_declaration": {
+ "supported_task_types": supported_task_types,
+ "input_requirements": {"type": "object", "properties": {"text": {"type": "string"}}},
+ "output_contract": {"type": "object", "properties": {"summary": {"type": "string"}}},
+ "supports_concurrency": True,
+ "allows_auto_retry": False,
+ },
+ "input_schema": {"schema_version": "1"},
+ "output_schema": {"schema_version": "1"},
+ "timeout_seconds": 300,
+ "max_retries": 0,
+ "enabled": enabled,
+ "version": "1.0.0",
+ }
+ response = client.post("/agents/register", json=payload)
+ assert response.status_code == 201
+ return response.json()
+
+
+def _create_batch_with_task(client: TestClient, *, task_type: str, suffix: str) -> str:
+ response = client.post(
+ "/task-batches",
+ json={
+ "title": f"{TEST_BATCH_PREFIX}{suffix}",
+ "description": "registry test batch",
+ "created_by": "pytest",
+ "metadata": {"suite": "agent-registry"},
+ "tasks": [
+ {
+ "client_task_id": "task_1",
+ "title": f"registry task {suffix} 1",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "alpha"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_2",
+ "title": f"registry task {suffix} 2",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "beta"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_3",
+ "title": f"registry task {suffix} 3",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "gamma"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ ],
+ },
+ )
+ assert response.status_code == 201
+ return response.json()["tasks"][0]["task_id"]
+
+
+def _insert_runs(task_id: str, agent_role_id: str, run_statuses: list[str]) -> None:
+ engine = create_engine(_database_url())
+ with Session(engine) as session:
+ for run_status in run_statuses:
+ session.execute(
+ text(
+ """
+ INSERT INTO execution_runs (
+ id,
+ task_id,
+ agent_role_id,
+ run_status,
+ started_at,
+ finished_at,
+ logs,
+ input_snapshot,
+ output_snapshot,
+ error_message,
+ cancelled_at,
+ cancel_reason,
+ token_usage,
+ latency_ms
+ ) VALUES (
+ :id,
+ :task_id,
+ :agent_role_id,
+ :run_status,
+ NOW(),
+ NOW(),
+ ARRAY[]::text[],
+ '{}'::jsonb,
+ '{}'::jsonb,
+ NULL,
+ NULL,
+ NULL,
+ '{}'::jsonb,
+ 100
+ )
+ """
+ ),
+ {
+ "id": f"run_{uuid.uuid4().hex}",
+ "task_id": task_id,
+ "agent_role_id": agent_role_id,
+ "run_status": run_status,
+ },
+ )
+ session.commit()
+
+
+def _seed_registry_history(*, suffix: str, run_statuses: list[str]) -> dict:
+ batch_id = f"batch_{uuid.uuid4().hex}"
+ task_id = f"task_{uuid.uuid4().hex}"
+ role = _register_agent(
+ client,
+ role_name=f"{TEST_ROLE_PREFIX}{suffix}",
+ supported_task_types=["registry_success_case"],
+ )
+ role_id = role["id"]
+ role_name = role["role_name"]
+
+ engine = create_engine(_database_url())
+ with Session(engine) as session:
+ session.execute(
+ text(
+ """
+ INSERT INTO task_batches (
+ id,
+ title,
+ description,
+ created_by,
+ created_at,
+ status,
+ total_tasks,
+ metadata
+ ) VALUES (
+ :id,
+ :title,
+ :description,
+ 'pytest',
+ NOW(),
+ 'submitted',
+ 1,
+ '{}'::jsonb
+ )
+ """
+ ),
+ {
+ "id": batch_id,
+ "title": f"{TEST_BATCH_PREFIX}{suffix}",
+ "description": "registry seeded batch",
+ },
+ )
+ session.execute(
+ text(
+ """
+ INSERT INTO tasks (
+ id,
+ batch_id,
+ title,
+ description,
+ task_type,
+ priority,
+ status,
+ input_payload,
+ expected_output_schema,
+ assigned_agent_role,
+ dependency_ids,
+ retry_count,
+ cancellation_requested,
+ cancellation_requested_at,
+ cancellation_reason,
+ created_at,
+ updated_at
+ ) VALUES (
+ :id,
+ :batch_id,
+ :title,
+ :description,
+ 'registry_success_case',
+ 'medium',
+ 'success',
+ '{}'::jsonb,
+ '{}'::jsonb,
+ :assigned_agent_role,
+ ARRAY[]::varchar[],
+ 0,
+ FALSE,
+ NULL,
+ NULL,
+ NOW(),
+ NOW()
+ )
+ """
+ ),
+ {
+ "id": task_id,
+ "batch_id": batch_id,
+ "title": f"registry task {suffix}",
+ "description": "registry seeded task",
+ "assigned_agent_role": role_name,
+ },
+ )
+ for run_status in run_statuses:
+ session.execute(
+ text(
+ """
+ INSERT INTO execution_runs (
+ id,
+ task_id,
+ agent_role_id,
+ run_status,
+ started_at,
+ finished_at,
+ logs,
+ input_snapshot,
+ output_snapshot,
+ error_message,
+ cancelled_at,
+ cancel_reason,
+ token_usage,
+ latency_ms
+ ) VALUES (
+ :id,
+ :task_id,
+ :agent_role_id,
+ :run_status,
+ NOW(),
+ NOW(),
+ ARRAY[]::text[],
+ '{}'::jsonb,
+ '{}'::jsonb,
+ NULL,
+ NULL,
+ NULL,
+ '{}'::jsonb,
+ 100
+ )
+ """
+ ),
+ {
+ "id": f"run_{uuid.uuid4().hex}",
+ "task_id": task_id,
+ "agent_role_id": role_id,
+ "run_status": run_status,
+ },
+ )
+ session.commit()
+
+ return role
+
+
+_cleanup_database()
+
+from src.apps.api.app import app # noqa: E402
+
+
+client = TestClient(app)
+
+
+def setup_function() -> None:
+ _cleanup_database()
+
+
+def teardown_function() -> None:
+ _cleanup_database()
+
+
+def test_agent_registry_aggregates_run_history_and_success_rate() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ role = _seed_registry_history(suffix=suffix, run_statuses=["success", "failed", "cancelled"])
+
+ response = client.get("/agents/registry")
+ assert response.status_code == 200
+ payload = response.json()
+ registry_item = next(item for item in payload["items"] if item["id"] == role["id"])
+
+ assert registry_item["role_name"] == role["role_name"]
+ assert registry_item["total_runs"] == 3
+ assert registry_item["success_runs"] == 1
+ assert registry_item["success_rate"] == 33.33
+
+
+def test_agent_registry_diagnosis_distinguishes_enabled_disabled_and_missing_matches() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ enabled_role = _register_agent(
+ client,
+ role_name=f"{TEST_ROLE_PREFIX}enabled-{suffix}",
+ supported_task_types=["match_enabled_case"],
+ )
+ disabled_role = _register_agent(
+ client,
+ role_name=f"{TEST_ROLE_PREFIX}disabled-{suffix}",
+ supported_task_types=["match_disabled_case"],
+ enabled=False,
+ )
+
+ enabled_response = client.get("/agents/registry", params={"task_type": "match_enabled_case"})
+ assert enabled_response.status_code == 200
+ enabled_diagnosis = enabled_response.json()["diagnosis"]
+ assert enabled_diagnosis["status"] == "matched_enabled"
+ assert enabled_diagnosis["matching_enabled_roles"] == [enabled_role["role_name"]]
+ assert enabled_diagnosis["matching_disabled_roles"] == []
+
+ disabled_response = client.get("/agents/registry", params={"task_type": "match_disabled_case"})
+ assert disabled_response.status_code == 200
+ disabled_diagnosis = disabled_response.json()["diagnosis"]
+ assert disabled_diagnosis["status"] == "matched_disabled_only"
+ assert disabled_diagnosis["matching_enabled_roles"] == []
+ assert disabled_diagnosis["matching_disabled_roles"] == [disabled_role["role_name"]]
+
+ missing_response = client.get("/agents/registry", params={"task_type": "totally_missing_case"})
+ assert missing_response.status_code == 200
+ missing_diagnosis = missing_response.json()["diagnosis"]
+ assert missing_diagnosis["status"] == "no_match"
+ assert missing_diagnosis["matching_enabled_roles"] == []
+ assert missing_diagnosis["matching_disabled_roles"] == []
+
+
+def test_agent_registry_reports_no_run_history_when_role_has_no_runs() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ role = _register_agent(
+ client,
+ role_name=f"{TEST_ROLE_PREFIX}no-runs-{suffix}",
+ supported_task_types=["no_runs_case"],
+ )
+
+ response = client.get("/agents/registry")
+ assert response.status_code == 200
+ registry_item = next(item for item in response.json()["items"] if item["id"] == role["id"])
+
+ assert registry_item["total_runs"] == 0
+ assert registry_item["success_runs"] == 0
+ assert registry_item["success_rate"] is None
+
+
+def test_console_agent_registry_page_is_accessible() -> None:
+ response = client.get("/console/agents")
+ assert response.status_code == 200
+ assert "Agent Registry" in response.text
+ assert "/console/assets/agents.js" in response.text
+
+
+def test_agent_registry_assets_include_success_rate_and_diagnosis_copy() -> None:
+ response = client.get("/console/assets/agents.js")
+ assert response.status_code == 200
+ assert "Success rate" in response.text
+ assert "Why no suitable role?" in client.get("/console/agents").text
+ assert "No run history" in response.text
diff --git a/src/tests/test_batch_detail_page.py b/src/tests/test_batch_detail_page.py
index 91141ec..253a1b1 100644
--- a/src/tests/test_batch_detail_page.py
+++ b/src/tests/test_batch_detail_page.py
@@ -144,6 +144,16 @@ def test_batch_detail_page_can_link_to_run_detail_when_latest_run_exists() -> No
assert "View run detail" in response.text
+def test_batch_detail_page_assets_include_batch_timeline() -> None:
+ page_response = client.get("/console/batches/sample-batch-id")
+ assert page_response.status_code == 200
+ assert "Execution timeline" in page_response.text
+
+ asset_response = client.get("/console/assets/batch-detail.js")
+ assert asset_response.status_code == 200
+ assert "/task-batches/${batchId}/timeline" in asset_response.text
+
+
def test_batch_detail_summary_supports_mixed_risk_sections() -> None:
suffix = uuid.uuid4().hex[:8]
_register_agent(client, role_name="default_worker", supported_task_types=[])
diff --git a/src/tests/test_execution_timeline_api.py b/src/tests/test_execution_timeline_api.py
new file mode 100644
index 0000000..b7618e9
--- /dev/null
+++ b/src/tests/test_execution_timeline_api.py
@@ -0,0 +1,215 @@
+from __future__ import annotations
+
+import os
+import sys
+import uuid
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+
+from fastapi.testclient import TestClient
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+
+ROOT = Path(__file__).resolve().parents[2]
+TEST_PREFIX = "execution-timeline-test-"
+
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+
+def _database_url() -> str:
+ database_url = os.getenv("DATABASE_URL")
+ if not database_url:
+ raise RuntimeError("DATABASE_URL is not set")
+ return database_url
+
+
+def _cleanup_database() -> None:
+ engine = create_engine(_database_url())
+ with engine.begin() as conn:
+ conn.execute(text("DELETE FROM task_batches WHERE title LIKE :prefix"), {"prefix": f"{TEST_PREFIX}%"})
+ conn.execute(text("DELETE FROM agent_roles WHERE role_name LIKE :prefix"), {"prefix": f"{TEST_PREFIX}%"})
+
+
+def _register_agent(client: TestClient, *, role_name: str, supported_task_types: list[str]) -> dict:
+ payload = {
+ "role_name": role_name,
+ "description": "timeline role",
+ "capabilities": [f"task:{role_name}"],
+ "capability_declaration": {
+ "supported_task_types": supported_task_types,
+ "input_requirements": {"properties": {"text": {"type": "string"}}},
+ "output_contract": {"type": "object"},
+ "supports_concurrency": True,
+ "allows_auto_retry": False,
+ },
+ "input_schema": {},
+ "output_schema": {},
+ "timeout_seconds": 300,
+ "max_retries": 0,
+ "enabled": True,
+ "version": "1.0.0",
+ }
+ response = client.post("/agents/register", json=payload)
+ assert response.status_code == 201
+ return response.json()
+
+
+def _batch_payload(task_type: str, suffix: str) -> dict:
+ return {
+ "title": f"{TEST_PREFIX}batch-{suffix}",
+ "description": "timeline batch",
+ "created_by": "pytest",
+ "metadata": {"suite": "execution-timeline"},
+ "tasks": [
+ {
+ "client_task_id": "task_1",
+ "title": f"{TEST_PREFIX}task-{suffix}-1",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "hello"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_2",
+ "title": f"{TEST_PREFIX}task-{suffix}-2",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "world"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": ["task_1"],
+ },
+ {
+ "client_task_id": "task_3",
+ "title": f"{TEST_PREFIX}task-{suffix}-3",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "!"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ ],
+ }
+
+
+_cleanup_database()
+
+from src.apps.api.app import app # noqa: E402
+from src.packages.core.db.models import AssignmentORM, EventLogORM, ExecutionRunORM, TaskORM # noqa: E402
+from src.packages.core.task_state_machine import transition_task_status # noqa: E402
+
+
+client = TestClient(app)
+
+
+def setup_function() -> None:
+ _cleanup_database()
+
+
+def teardown_function() -> None:
+ _cleanup_database()
+
+
+def test_task_timeline_rebuilds_full_lifecycle_with_retry() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ role = _register_agent(client, role_name=f"{TEST_PREFIX}worker-{suffix}", supported_task_types=["generate"])
+ response = client.post("/task-batches", json=_batch_payload("generate", suffix))
+ assert response.status_code == 201
+ task_id = response.json()["tasks"][0]["task_id"]
+
+ engine = create_engine(_database_url())
+ with Session(engine) as session:
+ task = session.get(TaskORM, task_id)
+ assert task is not None
+ assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == task.id).first()
+ assert assignment is not None
+ first_run = ExecutionRunORM(
+ task_id=task.id,
+ agent_role_id=assignment.agent_role_id,
+ run_status="failed",
+ started_at=datetime.now(timezone.utc),
+ finished_at=datetime.now(timezone.utc) + timedelta(seconds=1),
+ error_message="first attempt failed",
+ input_snapshot={"attempt": 1},
+ output_snapshot={},
+ )
+ session.add(first_run)
+ session.flush()
+ session.add(
+ EventLogORM(
+ batch_id=task.batch_id,
+ task_id=task.id,
+ run_id=first_run.id,
+ event_type="execution_run_started",
+ event_status="running",
+ message="worker started execution run",
+ payload={"task_id": task.id, "run_id": first_run.id, "source": "worker"},
+ )
+ )
+ transition_task_status(session, task, "running", "worker claimed queued task", "worker", run_id=first_run.id)
+ transition_task_status(session, task, "failed", "worker execution failed", "worker", run_id=first_run.id)
+ transition_task_status(session, task, "queued", "retry requested", "review")
+ second_run = ExecutionRunORM(
+ task_id=task.id,
+ agent_role_id=assignment.agent_role_id,
+ run_status="success",
+ started_at=datetime.now(timezone.utc) + timedelta(seconds=2),
+ finished_at=datetime.now(timezone.utc) + timedelta(seconds=3),
+ input_snapshot={"attempt": 2},
+ output_snapshot={"artifact": "report"},
+ )
+ session.add(second_run)
+ session.flush()
+ session.add(
+ EventLogORM(
+ batch_id=task.batch_id,
+ task_id=task.id,
+ run_id=second_run.id,
+ event_type="execution_run_finished",
+ event_status="success",
+ message="worker completed execution run",
+ payload={"task_id": task.id, "run_id": second_run.id, "source": "worker"},
+ )
+ )
+ transition_task_status(session, task, "running", "worker claimed queued task", "worker", run_id=second_run.id)
+ transition_task_status(session, task, "success", "worker finished task successfully", "worker", run_id=second_run.id)
+ session.commit()
+
+ timeline_response = client.get(f"/tasks/{task_id}/timeline")
+ assert timeline_response.status_code == 200
+ payload = timeline_response.json()
+ stages = [item["stage"] for item in payload["items"]]
+ assert stages[0] == "created"
+ assert "routed" in stages
+ assert "queued" in stages
+ assert "running" in stages
+ assert "failed" in stages
+ assert "retry" in stages
+ assert "completed" in stages
+
+
+def test_task_timeline_includes_review_stage_and_batch_timeline_merges_task_events() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ response = client.post("/task-batches", json=_batch_payload("unmatched_type", suffix))
+ assert response.status_code == 201
+ batch_id = response.json()["batch_id"]
+ task_id = response.json()["tasks"][0]["task_id"]
+
+ task_timeline = client.get(f"/tasks/{task_id}/timeline")
+ assert task_timeline.status_code == 200
+ task_stages = [item["stage"] for item in task_timeline.json()["items"]]
+ assert "review" in task_stages
+
+ batch_timeline = client.get(f"/task-batches/{batch_id}/timeline")
+ assert batch_timeline.status_code == 200
+ items = batch_timeline.json()["items"]
+ assert items[0]["title"] == "Batch created"
+ assert any(item["task_id"] == task_id for item in items)
+ assert any(item["stage"] == "review" for item in items)
+
+
+def test_timeline_endpoints_return_404_for_unknown_resources() -> None:
+ assert client.get("/tasks/not_found/timeline").status_code == 404
+ assert client.get("/task-batches/not_found/timeline").status_code == 404
diff --git a/src/tests/test_run_detail_page.py b/src/tests/test_run_detail_page.py
index dbab614..fdad5e5 100644
--- a/src/tests/test_run_detail_page.py
+++ b/src/tests/test_run_detail_page.py
@@ -228,6 +228,16 @@ def test_console_run_detail_page_is_accessible() -> None:
assert "/console/assets/run-detail.js" in response.text
+def test_run_detail_page_assets_include_task_lifecycle_timeline() -> None:
+ page_response = client.get("/console/runs/sample-run-id")
+ assert page_response.status_code == 200
+ assert "Lifecycle timeline" in page_response.text
+
+ asset_response = client.get("/console/assets/run-detail.js")
+ assert asset_response.status_code == 200
+ assert "/tasks/${detail.task.task_id}/timeline" in asset_response.text
+
+
def test_batch_detail_assets_link_to_run_detail_page() -> None:
response = client.get("/console/assets/batch-detail.js")
assert response.status_code == 200
diff --git a/src/tests/test_task_status_history_api.py b/src/tests/test_task_status_history_api.py
new file mode 100644
index 0000000..72071a3
--- /dev/null
+++ b/src/tests/test_task_status_history_api.py
@@ -0,0 +1,175 @@
+from __future__ import annotations
+
+import os
+import sys
+import uuid
+from pathlib import Path
+
+from fastapi.testclient import TestClient
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+
+ROOT = Path(__file__).resolve().parents[2]
+TEST_PREFIX = "task-status-history-test-"
+
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+
+def _database_url() -> str:
+ database_url = os.getenv("DATABASE_URL")
+ if not database_url:
+ env_file = ROOT / ".env"
+ if env_file.exists():
+ for line in env_file.read_text(encoding="utf-8").splitlines():
+ if line.startswith("DATABASE_URL="):
+ database_url = line.split("=", 1)[1].strip()
+ break
+ if not database_url:
+ raise RuntimeError("DATABASE_URL is not set")
+ return database_url
+
+
+def _cleanup_database() -> None:
+ engine = create_engine(_database_url())
+ with engine.begin() as conn:
+ conn.execute(
+ text("DELETE FROM task_batches WHERE title LIKE :prefix"),
+ {"prefix": f"{TEST_PREFIX}%"},
+ )
+
+
+_cleanup_database()
+
+from src.apps.api.app import app # noqa: E402
+from src.packages.core.db.models import EventLogORM, TaskBatchORM, TaskORM # noqa: E402
+from src.packages.core.task_state_machine import transition_task_status # noqa: E402
+
+
+client = TestClient(app)
+
+
+def setup_function() -> None:
+ _cleanup_database()
+
+
+def teardown_function() -> None:
+ _cleanup_database()
+
+
+def _create_task(initial_status: str = "pending") -> str:
+ engine = create_engine(_database_url())
+ suffix = uuid.uuid4().hex[:8]
+ with Session(engine) as session:
+ batch = TaskBatchORM(
+ title=f"{TEST_PREFIX}{suffix}",
+ description="task status history batch",
+ created_by="pytest",
+ status="draft",
+ total_tasks=1,
+ metadata_json={"suite": "task-status-history"},
+ )
+ session.add(batch)
+ session.flush()
+ task = TaskORM(
+ batch_id=batch.id,
+ title=f"{TEST_PREFIX}task-{suffix}",
+ description="task status history task",
+ task_type="generate",
+ priority="medium",
+ status=initial_status,
+ input_payload={},
+ expected_output_schema={},
+ assigned_agent_role=None,
+ dependency_ids=[],
+ retry_count=0,
+ )
+ session.add(task)
+ session.commit()
+ return task.id
+
+
+def test_status_history_returns_status_changes_in_order() -> None:
+ engine = create_engine(_database_url())
+ task_id = _create_task("pending")
+
+ with Session(engine) as session:
+ task = session.get(TaskORM, task_id)
+ assert task is not None
+ transition_task_status(session, task, "queued", "routed to worker", "router")
+ transition_task_status(session, task, "running", "worker started", "worker")
+ transition_task_status(session, task, "success", "worker finished", "worker")
+ session.commit()
+
+ response = client.get(f"/tasks/{task_id}/status-history")
+ assert response.status_code == 200
+ payload = response.json()
+
+ assert [item["new_status"] for item in payload] == ["queued", "running", "success"]
+ assert payload[0]["task_id"] == task_id
+ assert payload[0]["old_status"] == "pending"
+ assert payload[0]["new_status"] == "queued"
+ assert payload[0]["reason"] == "routed to worker"
+ assert payload[0]["actor"] == "router"
+ assert payload[0]["timestamp"]
+
+
+def test_status_history_filters_out_non_status_events() -> None:
+ engine = create_engine(_database_url())
+ task_id = _create_task("pending")
+
+ with Session(engine) as session:
+ task = session.get(TaskORM, task_id)
+ assert task is not None
+ transition_task_status(session, task, "queued", "routed to worker", "router")
+ session.add(
+ EventLogORM(
+ batch_id=task.batch_id,
+ task_id=task.id,
+ event_type="execution_run_started",
+ event_status="running",
+ message="worker started execution run",
+ payload={"task_id": task.id, "source": "worker"},
+ )
+ )
+ session.commit()
+
+ response = client.get(f"/tasks/{task_id}/status-history")
+ assert response.status_code == 200
+ payload = response.json()
+
+ assert len(payload) == 1
+ assert payload[0]["new_status"] == "queued"
+
+
+def test_status_history_returns_empty_list_when_no_transition_exists() -> None:
+ task_id = _create_task("pending")
+
+ response = client.get(f"/tasks/{task_id}/status-history")
+ assert response.status_code == 200
+ assert response.json() == []
+
+
+def test_status_history_returns_404_for_unknown_task() -> None:
+ response = client.get("/tasks/not_found/status-history")
+ assert response.status_code == 404
+ assert response.json()["detail"] == "Task not found"
+
+
+def test_status_history_includes_cancel_transition_from_api() -> None:
+ task_id = _create_task("pending")
+
+ cancel_response = client.post(f"/tasks/{task_id}/cancel", json={"reason": "manual stop"})
+ assert cancel_response.status_code == 200
+ assert cancel_response.json()["status"] == "cancelled"
+
+ response = client.get(f"/tasks/{task_id}/status-history")
+ assert response.status_code == 200
+ payload = response.json()
+
+ assert len(payload) == 1
+ assert payload[0]["old_status"] == "pending"
+ assert payload[0]["new_status"] == "cancelled"
+ assert payload[0]["reason"] == "manual stop"
+ assert payload[0]["actor"] == "api"