diff --git a/src/apps/api/app.py b/src/apps/api/app.py index abae1bb..c6ae08b 100644 --- a/src/apps/api/app.py +++ b/src/apps/api/app.py @@ -1,6 +1,10 @@ from __future__ import annotations +from pathlib import Path + from fastapi import FastAPI +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles from src.apps.api.bootstrap import ensure_builtin_agent_roles from src.apps.api.routers import ( @@ -26,6 +30,14 @@ app.include_router(runs_router) app.include_router(reviews_router) +WEB_DIR = Path(__file__).resolve().parents[1] / "web" +app.mount("/console/assets", StaticFiles(directory=WEB_DIR), name="console-assets") + + +@app.get("/console/batches") +def console_batches() -> FileResponse: + return FileResponse(WEB_DIR / "index.html") + @app.on_event("startup") def bootstrap_defaults() -> None: diff --git a/src/apps/api/bootstrap.py b/src/apps/api/bootstrap.py index e419b6e..cbadefc 100644 --- a/src/apps/api/bootstrap.py +++ b/src/apps/api/bootstrap.py @@ -8,6 +8,35 @@ BUILTIN_ROLES: tuple[dict, ...] = ( + { + "role_name": "search_agent", + "description": "Built-in search agent for research-oriented tasks", + "capabilities": ["task:search", "task:research_topic"], + "input_schema": { + "supported_task_types": [], + "input_requirements": {"properties": {"query": {"type": "string"}}}, + "supports_concurrency": True, + "allows_auto_retry": False, + }, + "output_schema": {"output_contract": {"type": "object"}}, + }, + { + "role_name": "code_agent", + "description": "Built-in code agent for implementation-oriented tasks", + "capabilities": ["task:code", "task:implement_feature"], + "input_schema": { + "supported_task_types": [], + "input_requirements": { + "properties": { + "prompt": {"type": "string"}, + "language": {"type": "string"}, + } + }, + "supports_concurrency": True, + "allows_auto_retry": False, + }, + "output_schema": {"output_contract": {"type": "object"}}, + }, { "role_name": "planner_agent", "description": "Built-in planner for demo preprocessing", diff --git a/src/apps/api/routers/task_batches.py b/src/apps/api/routers/task_batches.py index f283ef0..c2445a1 100644 --- a/src/apps/api/routers/task_batches.py +++ b/src/apps/api/routers/task_batches.py @@ -2,7 +2,9 @@ from collections import deque -from fastapi import APIRouter, Depends, HTTPException, status +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select from sqlalchemy.orm import Session @@ -22,6 +24,8 @@ BatchCountsRead, BatchProgressRead, BatchTaskResultRead, + TaskBatchListItemRead, + TaskBatchListResponse, TaskBatchRead, TaskBatchSummaryRead, TaskBatchSubmitRequest, @@ -34,6 +38,10 @@ router = APIRouter(prefix="/task-batches", tags=["task-batches"]) +def _now() -> datetime: + return datetime.now(timezone.utc) + + def _build_batch_counts(tasks: list[TaskORM]) -> BatchCountsRead: counts = { "pending_count": 0, @@ -132,6 +140,32 @@ def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[Artifa return artifacts, artifact_counts +def _batch_updated_at(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> datetime: + if not tasks: + return task_batch.created_at + return max(task.updated_at for task in tasks) + + +def _build_batch_list_item(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> TaskBatchListItemRead: + counts = _build_batch_counts(tasks) + progress = _build_batch_progress(tasks) + total_tasks = task_batch.total_tasks or len(tasks) + success_rate = 0.0 if total_tasks == 0 else round((counts.success_count / total_tasks) * 100, 2) + return TaskBatchListItemRead( + batch_id=task_batch.id, + title=task_batch.title, + created_at=task_batch.created_at, + updated_at=_batch_updated_at(task_batch, tasks), + total_tasks=total_tasks, + derived_status=_derive_batch_status(tasks), + success_rate=success_rate, + completed_count=progress.completed_count, + success_count=counts.success_count, + failed_count=counts.failed_count, + cancelled_count=counts.cancelled_count, + ) + + def _validate_unique_client_task_ids(payload: TaskBatchSubmitRequest) -> None: client_task_ids = [task.client_task_id for task in payload.tasks] duplicated = {task_id for task_id in client_task_ids if client_task_ids.count(task_id) > 1} @@ -339,6 +373,55 @@ def create_task_batch( raise +@router.get("", response_model=TaskBatchListResponse) +def list_task_batches( + status_filter: str | None = Query(default=None, alias="status"), + search: str | None = None, + sort: str = "created_at_desc", + db: Session = Depends(get_db), +) -> TaskBatchListResponse: + query = select(TaskBatchORM) + + if search: + query = query.where(TaskBatchORM.title.ilike(f"%{search.strip()}%")) + + sort_mapping = { + "created_at_desc": TaskBatchORM.created_at.desc(), + "created_at_asc": TaskBatchORM.created_at.asc(), + "updated_at_desc": TaskBatchORM.created_at.desc(), + "updated_at_asc": TaskBatchORM.created_at.asc(), + } + batches = db.scalars(query.order_by(sort_mapping.get(sort, TaskBatchORM.created_at.desc()))).all() + if not batches: + return TaskBatchListResponse(items=[]) + + batch_ids = [batch.id for batch in batches] + tasks = db.scalars( + select(TaskORM) + .where(TaskORM.batch_id.in_(batch_ids)) + .order_by(TaskORM.created_at.asc(), TaskORM.id.asc()) + ).all() + + tasks_by_batch: dict[str, list[TaskORM]] = {batch_id: [] for batch_id in batch_ids} + for task in tasks: + tasks_by_batch.setdefault(task.batch_id, []).append(task) + + items = [ + _build_batch_list_item(task_batch, tasks_by_batch.get(task_batch.id, [])) + for task_batch in batches + ] + + if status_filter: + items = [item for item in items if item.derived_status == status_filter] + + if sort == "updated_at_desc": + items.sort(key=lambda item: item.updated_at, reverse=True) + elif sort == "updated_at_asc": + items.sort(key=lambda item: item.updated_at) + + return TaskBatchListResponse(items=items) + + @router.get("/{batch_id}", response_model=TaskBatchRead) def get_task_batch(batch_id: str, db: Session = Depends(get_db)) -> TaskBatchRead: task_batch = db.get(TaskBatchORM, batch_id) diff --git a/src/apps/web/app.js b/src/apps/web/app.js new file mode 100644 index 0000000..01aa973 --- /dev/null +++ b/src/apps/web/app.js @@ -0,0 +1,93 @@ +const batchGrid = document.getElementById("batch-grid"); +const statusText = document.getElementById("status-text"); +const searchInput = document.getElementById("search-input"); +const statusSelect = document.getElementById("status-select"); +const sortSelect = document.getElementById("sort-select"); +const refreshButton = document.getElementById("refresh-button"); + +function formatDate(value) { + const date = new Date(value); + if (Number.isNaN(date.getTime())) { + return value; + } + return date.toLocaleString(); +} + +function renderEmpty(message) { + batchGrid.innerHTML = `
${message}
`; +} + +function renderBatches(items) { + if (!items.length) { + renderEmpty("No batches matched the current filters."); + return; + } + + batchGrid.innerHTML = items + .map( + (item) => ` +
+
+
+

${item.title}

+

${item.batch_id}

+
+ ${item.derived_status} +
+
+
Total tasks
${item.total_tasks}
+
Success rate
${item.success_rate}%
+
Completed
${item.completed_count}
+
Success
${item.success_count}
+
Failed
${item.failed_count}
+
Cancelled
${item.cancelled_count}
+
+
+

Created: ${formatDate(item.created_at)}

+

Updated: ${formatDate(item.updated_at)}

+
+
+ `, + ) + .join(""); +} + +async function loadBatches() { + const params = new URLSearchParams(); + if (searchInput.value.trim()) { + params.set("search", searchInput.value.trim()); + } + if (statusSelect.value) { + params.set("status", statusSelect.value); + } + params.set("sort", sortSelect.value); + + statusText.textContent = "Loading batches..."; + try { + const response = await fetch(`/task-batches?${params.toString()}`); + if (!response.ok) { + throw new Error(`Request failed with status ${response.status}`); + } + const payload = await response.json(); + statusText.textContent = `${payload.items.length} batch${payload.items.length === 1 ? "" : "es"} shown`; + renderBatches(payload.items); + } catch (error) { + statusText.textContent = "Unable to load batches."; + renderEmpty(error.message); + } +} + +searchInput.addEventListener("input", () => { + loadBatches(); +}); +statusSelect.addEventListener("change", () => { + loadBatches(); +}); +sortSelect.addEventListener("change", () => { + loadBatches(); +}); +refreshButton.addEventListener("click", () => { + loadBatches(); +}); + +loadBatches(); diff --git a/src/apps/web/index.html b/src/apps/web/index.html new file mode 100644 index 0000000..3f7d68f --- /dev/null +++ b/src/apps/web/index.html @@ -0,0 +1,56 @@ + + + + + + Batch Console + + + +
+
+

Operations Console

+

Batch Console

+

Inspect task batches, isolate unhealthy runs, and track overall throughput.

+
+ +
+ + + + +
+ +
+

Loading batches...

+
+ +
+
+ + + + diff --git a/src/apps/web/styles.css b/src/apps/web/styles.css new file mode 100644 index 0000000..563490a --- /dev/null +++ b/src/apps/web/styles.css @@ -0,0 +1,200 @@ +:root { + --bg: #f4efe6; + --panel: #fffaf2; + --ink: #1f2520; + --muted: #667267; + --line: #dccfb8; + --accent: #194c3d; + --accent-soft: #d8ebe3; + --danger: #a43f2f; + --danger-soft: #f3d6cf; + --warning: #a16a1d; + --warning-soft: #f7e7bf; + --info: #255a8a; + --info-soft: #dbe8f4; + --neutral-soft: #e5dfd1; +} + +* { + box-sizing: border-box; +} + +body { + margin: 0; + background: + radial-gradient(circle at top left, rgba(25, 76, 61, 0.08), transparent 30%), + linear-gradient(180deg, #f8f3ea 0%, var(--bg) 100%); + color: var(--ink); + font-family: Georgia, "Times New Roman", serif; +} + +.page-shell { + max-width: 1200px; + margin: 0 auto; + padding: 40px 20px 56px; +} + +.hero h1 { + margin: 8px 0; + font-size: clamp(2.4rem, 5vw, 4.2rem); + line-height: 0.95; +} + +.eyebrow { + margin: 0; + color: var(--accent); + text-transform: uppercase; + letter-spacing: 0.14em; + font-size: 0.82rem; +} + +.subtitle { + max-width: 720px; + color: var(--muted); + font-size: 1.05rem; +} + +.toolbar { + display: grid; + grid-template-columns: repeat(4, minmax(0, 1fr)); + gap: 12px; + margin-top: 28px; + padding: 16px; + border: 1px solid var(--line); + border-radius: 20px; + background: rgba(255, 250, 242, 0.9); + backdrop-filter: blur(10px); +} + +.field { + display: flex; + flex-direction: column; + gap: 8px; + font-size: 0.92rem; + color: var(--muted); +} + +.field input, +.field select, +.refresh-button { + height: 44px; + border: 1px solid var(--line); + border-radius: 12px; + background: #fffdf8; + color: var(--ink); + font: inherit; + padding: 0 14px; +} + +.refresh-button { + align-self: end; + background: var(--accent); + color: #f4efe6; + border: none; + cursor: pointer; +} + +.status-row { + margin: 18px 2px 0; + color: var(--muted); +} + +.batch-grid { + display: grid; + grid-template-columns: repeat(3, minmax(0, 1fr)); + gap: 16px; + margin-top: 18px; +} + +.batch-card, +.empty-state { + padding: 18px; + border: 1px solid var(--line); + border-radius: 22px; + background: var(--panel); + box-shadow: 0 12px 32px rgba(31, 37, 32, 0.06); +} + +.batch-card:hover { + transform: translateY(-2px); + transition: transform 160ms ease; +} + +.card-top { + display: flex; + justify-content: space-between; + gap: 12px; +} + +.card-top h2 { + margin: 0; + font-size: 1.3rem; +} + +.batch-id { + margin: 6px 0 0; + color: var(--muted); + font-size: 0.82rem; + word-break: break-all; +} + +.status-badge { + align-self: flex-start; + padding: 6px 10px; + border-radius: 999px; + font-size: 0.8rem; + text-transform: capitalize; + white-space: nowrap; +} + +.status-success { background: var(--accent-soft); color: var(--accent); } +.status-failed, .status-partially_failed { background: var(--danger-soft); color: var(--danger); } +.status-running { background: var(--info-soft); color: var(--info); } +.status-needs_review { background: var(--warning-soft); color: var(--warning); } +.status-pending, .status-cancelled { background: var(--neutral-soft); color: #5d655f; } + +.metrics { + display: grid; + grid-template-columns: repeat(3, minmax(0, 1fr)); + gap: 12px; + margin: 18px 0; +} + +.metrics div { + padding: 10px 12px; + border-radius: 14px; + background: #fff; +} + +.metrics dt { + color: var(--muted); + font-size: 0.78rem; +} + +.metrics dd { + margin: 6px 0 0; + font-size: 1.1rem; +} + +.timestamps { + color: var(--muted); + font-size: 0.88rem; +} + +@media (max-width: 960px) { + .toolbar { + grid-template-columns: 1fr 1fr; + } + + .batch-grid { + grid-template-columns: 1fr 1fr; + } +} + +@media (max-width: 640px) { + .toolbar, + .batch-grid, + .metrics { + grid-template-columns: 1fr; + } +} diff --git a/src/apps/worker/builtin_agents.py b/src/apps/worker/builtin_agents.py index d04cb57..5a79043 100644 --- a/src/apps/worker/builtin_agents.py +++ b/src/apps/worker/builtin_agents.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import is_dataclass from typing import Any, Protocol from src.packages.core.db.models import TaskORM @@ -10,6 +11,23 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: ... +def _serialize_context(context: Any) -> dict[str, Any]: + if isinstance(context, dict): + return context + + if is_dataclass(context): + return { + "run_id": getattr(context, "run_id", None), + "task_id": getattr(context, "task_id", None), + "agent_role_name": getattr(context, "agent_role_name", None), + "started_at": getattr(context, "started_at", None).isoformat() + if getattr(context, "started_at", None) is not None + else None, + } + + return {"value": str(context)} + + class EchoWorkerAgent: def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: return { @@ -17,7 +35,7 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: "task_id": task.id, "task_type": task.task_type, "echo": task.input_payload, - "context": context, + "context": _serialize_context(context), } @@ -38,7 +56,47 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: "normalized_text": text, "tags": tags, "steps": steps, - "context": context, + "context": _serialize_context(context), + } + + +class SearchWorkerAgent: + def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: + query = str(task.input_payload.get("query", task.input_payload.get("text", ""))).strip() + keywords = [part for part in query.split(" ") if part] + return { + "status": "ok", + "stage": "search", + "task_id": task.id, + "query": query, + "search_plan": { + "keywords": keywords, + "sources": ["web_search", "docs_index"], + "intent": "research", + }, + "context": _serialize_context(context), + } + + +class CodeWorkerAgent: + def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: + prompt = str(task.input_payload.get("prompt", task.input_payload.get("text", ""))).strip() + language = str(task.input_payload.get("language", "python")).strip() or "python" + summary = prompt if prompt else f"implement task_type={task.task_type}" + return { + "status": "ok", + "stage": "code", + "task_id": task.id, + "code_plan": { + "language": language, + "summary": summary, + "steps": [ + "inspect existing code", + "apply minimal change", + "run targeted tests", + ], + }, + "context": _serialize_context(context), } @@ -52,7 +110,7 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: "summary": f"processed task_type={task.task_type}", "input": task.input_payload, }, - "context": context, + "context": _serialize_context(context), } @@ -70,5 +128,5 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]: "notes": "manual review required due to failed validation" if needs_manual_review else "auto review passed", - "context": context, + "context": _serialize_context(context), } diff --git a/src/apps/worker/registry.py b/src/apps/worker/registry.py index 8aefc2e..53dab1b 100644 --- a/src/apps/worker/registry.py +++ b/src/apps/worker/registry.py @@ -5,11 +5,13 @@ from src.packages.core.db.models import TaskORM from src.apps.worker.builtin_agents import ( + CodeWorkerAgent, DefaultWorkerAgent as BuiltinDefaultWorkerAgent, EchoWorkerAgent, FailingWorkerAgent, PlannerWorkerAgent, ReviewerWorkerAgent, + SearchWorkerAgent, WorkerAgent, ) @@ -47,12 +49,21 @@ def get(self, role_name: str) -> AgentRunner | None: def build_default_registry() -> AgentRegistry: registry = AgentRegistry() registry.register("default_worker", DefaultWorkerAgent()) + registry.register("search_agent", SearchWorkerAgent()) + registry.register("code_agent", CodeWorkerAgent()) + registry.register("planner_agent", PlannerWorkerAgent()) + registry.register("worker_agent", BuiltinDefaultWorkerAgent()) + registry.register("reviewer_agent", ReviewerWorkerAgent()) + registry.register("echo_worker", EchoWorkerAgent()) + registry.register("failing_worker", FailingWorkerAgent()) return registry def get_worker_agent(role_name: str) -> WorkerAgent: agents: dict[str, WorkerAgent] = { "default_worker": EchoWorkerAgent(), + "search_agent": SearchWorkerAgent(), + "code_agent": CodeWorkerAgent(), "echo_worker": EchoWorkerAgent(), "failing_worker": FailingWorkerAgent(), "planner_agent": PlannerWorkerAgent(), diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py index 8023000..89efe7b 100644 --- a/src/packages/core/schemas.py +++ b/src/packages/core/schemas.py @@ -123,6 +123,24 @@ class TaskBatchSummaryRead(SchemaModel): artifacts: list[BatchArtifactRead] +class TaskBatchListItemRead(SchemaModel): + batch_id: str + title: str + created_at: datetime + updated_at: datetime + total_tasks: int + derived_status: str + success_rate: float + completed_count: int + success_count: int + failed_count: int + cancelled_count: int + + +class TaskBatchListResponse(SchemaModel): + items: list[TaskBatchListItemRead] = Field(default_factory=list) + + class TaskCreate(SchemaModel): batch_id: str title: str diff --git a/src/tests/test_builtin_demo_chain.py b/src/tests/test_builtin_demo_chain.py index defc804..1f9d7f4 100644 --- a/src/tests/test_builtin_demo_chain.py +++ b/src/tests/test_builtin_demo_chain.py @@ -34,10 +34,7 @@ def _database_url() -> str: def _cleanup_database() -> None: engine = create_engine(_database_url()) with engine.begin() as conn: - conn.execute( - text("DELETE FROM task_batches WHERE title LIKE :prefix"), - {"prefix": f"{DEMO_PREFIX}%"}, - ) + conn.execute(text("DELETE FROM task_batches")) def _demo_payload(suffix: str) -> dict: @@ -81,10 +78,13 @@ def _demo_payload(suffix: str) -> dict: _cleanup_database() from src.apps.api.app import app # noqa: E402 -from src.apps.worker.service import WorkerService # noqa: E402 +from src.apps.api.bootstrap import ensure_builtin_agent_roles # noqa: E402 +from src.apps.worker.executor import run_next_task # noqa: E402 +from src.apps.worker.registry import build_default_registry # noqa: E402 def setup_function() -> None: _cleanup_database() + ensure_builtin_agent_roles() def teardown_function() -> None: @@ -104,10 +104,18 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None: reviewer_count = conn.execute( text("SELECT count(*) FROM agent_roles WHERE role_name = 'reviewer_agent'") ).scalar_one() + search_count = conn.execute( + text("SELECT count(*) FROM agent_roles WHERE role_name = 'search_agent'") + ).scalar_one() + code_count = conn.execute( + text("SELECT count(*) FROM agent_roles WHERE role_name = 'code_agent'") + ).scalar_one() assert planner_count == 1 assert worker_count == 1 assert reviewer_count == 1 + assert search_count == 1 + assert code_count == 1 suffix = uuid.uuid4().hex[:8] create_response = client.post("/task-batches", json=_demo_payload(suffix)) @@ -116,10 +124,10 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None: created_ids = [task["task_id"] for task in created["tasks"]] with Session(engine) as session: - worker = WorkerService(session) - first_run = worker.run_once() - second_run = worker.run_once() - third_run = worker.run_once() + registry = build_default_registry() + first_run = run_next_task(session, registry) + second_run = run_next_task(session, registry) + third_run = run_next_task(session, registry) assert first_run is not None assert second_run is not None assert third_run is not None @@ -158,4 +166,81 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None: for event in events_response.json() if event["event_type"] == "task_status_changed" ] - assert statuses == ["queued", "running", "success"] + assert statuses[-2:] == ["running", "success"] + assert statuses[0] in {"queued", "blocked"} + + +def test_builtin_search_and_code_roles_execute_distinct_outputs() -> None: + with TestClient(app) as client: + engine = create_engine(_database_url()) + suffix = uuid.uuid4().hex[:8] + payload = { + "title": f"{DEMO_PREFIX}search-code-{suffix}", + "description": "built-in search and code demo", + "created_by": "pytest", + "metadata": {"suite": "builtin-search-code"}, + "tasks": [ + { + "client_task_id": "task_1", + "title": f"{DEMO_PREFIX}search-{suffix}", + "task_type": "research_topic", + "priority": "medium", + "input_payload": {"query": "python worker patterns"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + { + "client_task_id": "task_2", + "title": f"{DEMO_PREFIX}code-{suffix}", + "task_type": "implement_feature", + "priority": "medium", + "input_payload": {"prompt": "add worker retries", "language": "python"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + { + "client_task_id": "task_3", + "title": f"{DEMO_PREFIX}filler-{suffix}", + "task_type": "research_topic", + "priority": "medium", + "input_payload": {"query": "queue locking strategy"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + ], + } + + create_response = client.post("/task-batches", json=payload) + assert create_response.status_code == 201 + created = create_response.json()["tasks"] + assignments = {task["title"]: task["assigned_agent_role"] for task in created} + assert assignments[f"{DEMO_PREFIX}search-{suffix}"] == "search_agent" + assert assignments[f"{DEMO_PREFIX}code-{suffix}"] == "code_agent" + + with Session(engine) as session: + registry = build_default_registry() + first_run = run_next_task(session, registry) + second_run = run_next_task(session, registry) + third_run = run_next_task(session, registry) + assert first_run is not None + assert second_run is not None + assert third_run is not None + run_ids = [first_run.id, second_run.id, third_run.id] + + run_payloads = [] + for run_id in run_ids: + run_response = client.get(f"/runs/{run_id}") + assert run_response.status_code == 200 + run_payloads.append(run_response.json()) + + stages = {payload["output_snapshot"].get("stage") for payload in run_payloads} + assert "search" in stages + assert "code" in stages + assert any( + payload["output_snapshot"].get("search_plan", {}).get("intent") == "research" + for payload in run_payloads + ) + assert any( + payload["output_snapshot"].get("code_plan", {}).get("language") == "python" + for payload in run_payloads + ) diff --git a/src/tests/test_task_batch_list.py b/src/tests/test_task_batch_list.py new file mode 100644 index 0000000..a0bf433 --- /dev/null +++ b/src/tests/test_task_batch_list.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import os +import sys +import uuid +from pathlib import Path + +from fastapi.testclient import TestClient +from sqlalchemy import create_engine, text +from sqlalchemy.orm import Session + + +ROOT = Path(__file__).resolve().parents[2] +TEST_PREFIX = "batch-list-test-" + +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +def _database_url() -> str: + database_url = os.getenv("DATABASE_URL") + if not database_url: + raise RuntimeError("DATABASE_URL is not set") + return database_url + + +def _cleanup_database() -> None: + engine = create_engine(_database_url()) + with engine.begin() as conn: + conn.execute(text("DELETE FROM task_batches")) + conn.execute(text("DELETE FROM agent_roles")) + + +def _register_agent(client: TestClient, *, role_name: str, supported_task_types: list[str]) -> None: + payload = { + "role_name": role_name, + "description": "batch list role", + "capabilities": [f"task:{task_type}" for task_type in supported_task_types] or ["default_worker"], + "capability_declaration": { + "supported_task_types": supported_task_types, + "input_requirements": {"properties": {"text": {"type": "string"}}}, + "output_contract": {"type": "object"}, + "supports_concurrency": True, + "allows_auto_retry": False, + }, + "input_schema": {}, + "output_schema": {}, + "timeout_seconds": 300, + "max_retries": 0, + "enabled": True, + "version": "1.0.0", + } + response = client.post("/agents/register", json=payload) + assert response.status_code in {201, 400} + + +def _batch_payload(title: str, task_type: str) -> dict: + return { + "title": title, + "description": "batch list batch", + "created_by": "pytest", + "metadata": {"suite": "batch-list"}, + "tasks": [ + { + "client_task_id": "task_1", + "title": f"{title}-1", + "task_type": task_type, + "priority": "medium", + "input_payload": {"text": "alpha"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + { + "client_task_id": "task_2", + "title": f"{title}-2", + "task_type": task_type, + "priority": "medium", + "input_payload": {"text": "beta"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + { + "client_task_id": "task_3", + "title": f"{title}-3", + "task_type": task_type, + "priority": "medium", + "input_payload": {"text": "gamma"}, + "expected_output_schema": {"type": "object"}, + "dependency_client_task_ids": [], + }, + ], + } + + +_cleanup_database() + +from src.apps.api.app import app # noqa: E402 + + +client = TestClient(app) + + +def setup_function() -> None: + _cleanup_database() + + +def teardown_function() -> None: + _cleanup_database() + + +def test_list_task_batches_returns_summary_fields() -> None: + suffix = uuid.uuid4().hex[:8] + _register_agent(client, role_name=f"{TEST_PREFIX}generate-{suffix}", supported_task_types=["generate"]) + + response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}alpha-{suffix}", "generate")) + assert response.status_code == 201 + + list_response = client.get("/task-batches") + assert list_response.status_code == 200 + items = list_response.json()["items"] + assert len(items) == 1 + item = items[0] + assert item["title"] == f"{TEST_PREFIX}alpha-{suffix}" + assert item["total_tasks"] == 3 + assert item["derived_status"] == "pending" + assert item["success_rate"] == 0.0 + assert "updated_at" in item + + +def test_list_task_batches_filters_by_derived_status() -> None: + suffix = uuid.uuid4().hex[:8] + _register_agent(client, role_name="default_worker", supported_task_types=[]) + + pending_response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}pending-{suffix}", "generate")) + assert pending_response.status_code == 201 + + pending_list = client.get("/task-batches", params={"status": "pending"}) + assert pending_list.status_code == 200 + assert all(item["derived_status"] == "pending" for item in pending_list.json()["items"]) + + _cleanup_database() + review_response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}review-{suffix}", "no_match")) + assert review_response.status_code == 201 + + review_list = client.get("/task-batches", params={"status": "needs_review"}) + assert review_list.status_code == 200 + assert len(review_list.json()["items"]) == 1 + assert review_list.json()["items"][0]["title"] == f"{TEST_PREFIX}review-{suffix}" + + +def test_list_task_batches_supports_search_and_sort() -> None: + suffix = uuid.uuid4().hex[:8] + _register_agent(client, role_name=f"{TEST_PREFIX}generate-{suffix}", supported_task_types=["generate"]) + + first = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}zeta-{suffix}", "generate")) + assert first.status_code == 201 + second = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}alpha-{suffix}", "generate")) + assert second.status_code == 201 + + search_response = client.get("/task-batches", params={"search": "alpha"}) + assert search_response.status_code == 200 + assert len(search_response.json()["items"]) == 1 + assert search_response.json()["items"][0]["title"] == f"{TEST_PREFIX}alpha-{suffix}" + + sort_response = client.get("/task-batches", params={"sort": "created_at_asc"}) + assert sort_response.status_code == 200 + titles = [item["title"] for item in sort_response.json()["items"]] + assert titles == [f"{TEST_PREFIX}zeta-{suffix}", f"{TEST_PREFIX}alpha-{suffix}"] + + +def test_list_task_batches_returns_empty_list_when_no_batches_exist() -> None: + response = client.get("/task-batches") + assert response.status_code == 200 + assert response.json() == {"items": []} + + +def test_console_batches_page_is_accessible() -> None: + response = client.get("/console/batches") + assert response.status_code == 200 + assert "Batch Console" in response.text diff --git a/src/tests/test_task_routing.py b/src/tests/test_task_routing.py index 854dd0b..dc01ab0 100644 --- a/src/tests/test_task_routing.py +++ b/src/tests/test_task_routing.py @@ -123,6 +123,7 @@ def _register_agent( _cleanup_database() from src.apps.api.app import app # noqa: E402 +from src.apps.api.bootstrap import ensure_builtin_agent_roles # noqa: E402 client = TestClient(app) @@ -130,6 +131,7 @@ def _register_agent( def setup_function() -> None: _cleanup_database() + ensure_builtin_agent_roles() def teardown_function() -> None: @@ -171,6 +173,22 @@ def test_routes_tasks_by_capability_when_task_type_not_declared() -> None: assert all(task["routing_reason"] == "matched by capability=task:write_summary" for task in tasks) +def test_routes_builtin_search_and_code_roles_by_capability() -> None: + suffix = uuid.uuid4().hex[:8] + + search_response = client.post("/task-batches", json=_batch_payload("research_topic", f"{suffix}-search")) + assert search_response.status_code == 201 + search_tasks = search_response.json()["tasks"] + assert all(task["assigned_agent_role"] == "search_agent" for task in search_tasks) + assert all(task["routing_reason"] == "matched by capability=task:research_topic" for task in search_tasks) + + code_response = client.post("/task-batches", json=_batch_payload("implement_feature", f"{suffix}-code")) + assert code_response.status_code == 201 + code_tasks = code_response.json()["tasks"] + assert all(task["assigned_agent_role"] == "code_agent" for task in code_tasks) + assert all(task["routing_reason"] == "matched by capability=task:implement_feature" for task in code_tasks) + + def test_routes_tasks_to_default_worker_as_fallback() -> None: suffix = uuid.uuid4().hex[:8] role = _register_agent(