diff --git a/src/apps/api/app.py b/src/apps/api/app.py
index abae1bb..c6ae08b 100644
--- a/src/apps/api/app.py
+++ b/src/apps/api/app.py
@@ -1,6 +1,10 @@
from __future__ import annotations
+from pathlib import Path
+
from fastapi import FastAPI
+from fastapi.responses import FileResponse
+from fastapi.staticfiles import StaticFiles
from src.apps.api.bootstrap import ensure_builtin_agent_roles
from src.apps.api.routers import (
@@ -26,6 +30,14 @@
app.include_router(runs_router)
app.include_router(reviews_router)
+WEB_DIR = Path(__file__).resolve().parents[1] / "web"
+app.mount("/console/assets", StaticFiles(directory=WEB_DIR), name="console-assets")
+
+
+@app.get("/console/batches")
+def console_batches() -> FileResponse:
+ return FileResponse(WEB_DIR / "index.html")
+
@app.on_event("startup")
def bootstrap_defaults() -> None:
diff --git a/src/apps/api/bootstrap.py b/src/apps/api/bootstrap.py
index e419b6e..cbadefc 100644
--- a/src/apps/api/bootstrap.py
+++ b/src/apps/api/bootstrap.py
@@ -8,6 +8,35 @@
BUILTIN_ROLES: tuple[dict, ...] = (
+ {
+ "role_name": "search_agent",
+ "description": "Built-in search agent for research-oriented tasks",
+ "capabilities": ["task:search", "task:research_topic"],
+ "input_schema": {
+ "supported_task_types": [],
+ "input_requirements": {"properties": {"query": {"type": "string"}}},
+ "supports_concurrency": True,
+ "allows_auto_retry": False,
+ },
+ "output_schema": {"output_contract": {"type": "object"}},
+ },
+ {
+ "role_name": "code_agent",
+ "description": "Built-in code agent for implementation-oriented tasks",
+ "capabilities": ["task:code", "task:implement_feature"],
+ "input_schema": {
+ "supported_task_types": [],
+ "input_requirements": {
+ "properties": {
+ "prompt": {"type": "string"},
+ "language": {"type": "string"},
+ }
+ },
+ "supports_concurrency": True,
+ "allows_auto_retry": False,
+ },
+ "output_schema": {"output_contract": {"type": "object"}},
+ },
{
"role_name": "planner_agent",
"description": "Built-in planner for demo preprocessing",
diff --git a/src/apps/api/routers/task_batches.py b/src/apps/api/routers/task_batches.py
index f283ef0..c2445a1 100644
--- a/src/apps/api/routers/task_batches.py
+++ b/src/apps/api/routers/task_batches.py
@@ -2,7 +2,9 @@
from collections import deque
-from fastapi import APIRouter, Depends, HTTPException, status
+from datetime import datetime, timezone
+
+from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.orm import Session
@@ -22,6 +24,8 @@
BatchCountsRead,
BatchProgressRead,
BatchTaskResultRead,
+ TaskBatchListItemRead,
+ TaskBatchListResponse,
TaskBatchRead,
TaskBatchSummaryRead,
TaskBatchSubmitRequest,
@@ -34,6 +38,10 @@
router = APIRouter(prefix="/task-batches", tags=["task-batches"])
+def _now() -> datetime:
+ return datetime.now(timezone.utc)
+
+
def _build_batch_counts(tasks: list[TaskORM]) -> BatchCountsRead:
counts = {
"pending_count": 0,
@@ -132,6 +140,32 @@ def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[Artifa
return artifacts, artifact_counts
+def _batch_updated_at(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> datetime:
+ if not tasks:
+ return task_batch.created_at
+ return max(task.updated_at for task in tasks)
+
+
+def _build_batch_list_item(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> TaskBatchListItemRead:
+ counts = _build_batch_counts(tasks)
+ progress = _build_batch_progress(tasks)
+ total_tasks = task_batch.total_tasks or len(tasks)
+ success_rate = 0.0 if total_tasks == 0 else round((counts.success_count / total_tasks) * 100, 2)
+ return TaskBatchListItemRead(
+ batch_id=task_batch.id,
+ title=task_batch.title,
+ created_at=task_batch.created_at,
+ updated_at=_batch_updated_at(task_batch, tasks),
+ total_tasks=total_tasks,
+ derived_status=_derive_batch_status(tasks),
+ success_rate=success_rate,
+ completed_count=progress.completed_count,
+ success_count=counts.success_count,
+ failed_count=counts.failed_count,
+ cancelled_count=counts.cancelled_count,
+ )
+
+
def _validate_unique_client_task_ids(payload: TaskBatchSubmitRequest) -> None:
client_task_ids = [task.client_task_id for task in payload.tasks]
duplicated = {task_id for task_id in client_task_ids if client_task_ids.count(task_id) > 1}
@@ -339,6 +373,55 @@ def create_task_batch(
raise
+@router.get("", response_model=TaskBatchListResponse)
+def list_task_batches(
+ status_filter: str | None = Query(default=None, alias="status"),
+ search: str | None = None,
+ sort: str = "created_at_desc",
+ db: Session = Depends(get_db),
+) -> TaskBatchListResponse:
+ query = select(TaskBatchORM)
+
+ if search:
+ query = query.where(TaskBatchORM.title.ilike(f"%{search.strip()}%"))
+
+ sort_mapping = {
+ "created_at_desc": TaskBatchORM.created_at.desc(),
+ "created_at_asc": TaskBatchORM.created_at.asc(),
+ "updated_at_desc": TaskBatchORM.created_at.desc(),
+ "updated_at_asc": TaskBatchORM.created_at.asc(),
+ }
+ batches = db.scalars(query.order_by(sort_mapping.get(sort, TaskBatchORM.created_at.desc()))).all()
+ if not batches:
+ return TaskBatchListResponse(items=[])
+
+ batch_ids = [batch.id for batch in batches]
+ tasks = db.scalars(
+ select(TaskORM)
+ .where(TaskORM.batch_id.in_(batch_ids))
+ .order_by(TaskORM.created_at.asc(), TaskORM.id.asc())
+ ).all()
+
+ tasks_by_batch: dict[str, list[TaskORM]] = {batch_id: [] for batch_id in batch_ids}
+ for task in tasks:
+ tasks_by_batch.setdefault(task.batch_id, []).append(task)
+
+ items = [
+ _build_batch_list_item(task_batch, tasks_by_batch.get(task_batch.id, []))
+ for task_batch in batches
+ ]
+
+ if status_filter:
+ items = [item for item in items if item.derived_status == status_filter]
+
+ if sort == "updated_at_desc":
+ items.sort(key=lambda item: item.updated_at, reverse=True)
+ elif sort == "updated_at_asc":
+ items.sort(key=lambda item: item.updated_at)
+
+ return TaskBatchListResponse(items=items)
+
+
@router.get("/{batch_id}", response_model=TaskBatchRead)
def get_task_batch(batch_id: str, db: Session = Depends(get_db)) -> TaskBatchRead:
task_batch = db.get(TaskBatchORM, batch_id)
diff --git a/src/apps/web/app.js b/src/apps/web/app.js
new file mode 100644
index 0000000..01aa973
--- /dev/null
+++ b/src/apps/web/app.js
@@ -0,0 +1,93 @@
+const batchGrid = document.getElementById("batch-grid");
+const statusText = document.getElementById("status-text");
+const searchInput = document.getElementById("search-input");
+const statusSelect = document.getElementById("status-select");
+const sortSelect = document.getElementById("sort-select");
+const refreshButton = document.getElementById("refresh-button");
+
+function formatDate(value) {
+ const date = new Date(value);
+ if (Number.isNaN(date.getTime())) {
+ return value;
+ }
+ return date.toLocaleString();
+}
+
+function renderEmpty(message) {
+ batchGrid.innerHTML = `${message}`;
+}
+
+function renderBatches(items) {
+ if (!items.length) {
+ renderEmpty("No batches matched the current filters.");
+ return;
+ }
+
+ batchGrid.innerHTML = items
+ .map(
+ (item) => `
+
+
+
+
${item.title}
+
${item.batch_id}
+
+
${item.derived_status}
+
+
+ - Total tasks
- ${item.total_tasks}
+ Success rate${item.success_rate}%
+ Completed${item.completed_count}
+ Success${item.success_count}
+ Failed${item.failed_count}
+ Cancelled${item.cancelled_count}
+
+
+
Created: ${formatDate(item.created_at)}
+
Updated: ${formatDate(item.updated_at)}
+
+
+ `,
+ )
+ .join("");
+}
+
+async function loadBatches() {
+ const params = new URLSearchParams();
+ if (searchInput.value.trim()) {
+ params.set("search", searchInput.value.trim());
+ }
+ if (statusSelect.value) {
+ params.set("status", statusSelect.value);
+ }
+ params.set("sort", sortSelect.value);
+
+ statusText.textContent = "Loading batches...";
+ try {
+ const response = await fetch(`/task-batches?${params.toString()}`);
+ if (!response.ok) {
+ throw new Error(`Request failed with status ${response.status}`);
+ }
+ const payload = await response.json();
+ statusText.textContent = `${payload.items.length} batch${payload.items.length === 1 ? "" : "es"} shown`;
+ renderBatches(payload.items);
+ } catch (error) {
+ statusText.textContent = "Unable to load batches.";
+ renderEmpty(error.message);
+ }
+}
+
+searchInput.addEventListener("input", () => {
+ loadBatches();
+});
+statusSelect.addEventListener("change", () => {
+ loadBatches();
+});
+sortSelect.addEventListener("change", () => {
+ loadBatches();
+});
+refreshButton.addEventListener("click", () => {
+ loadBatches();
+});
+
+loadBatches();
diff --git a/src/apps/web/index.html b/src/apps/web/index.html
new file mode 100644
index 0000000..3f7d68f
--- /dev/null
+++ b/src/apps/web/index.html
@@ -0,0 +1,56 @@
+
+
+
+
+
+ Batch Console
+
+
+
+
+
+ Operations Console
+ Batch Console
+ Inspect task batches, isolate unhealthy runs, and track overall throughput.
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/apps/web/styles.css b/src/apps/web/styles.css
new file mode 100644
index 0000000..563490a
--- /dev/null
+++ b/src/apps/web/styles.css
@@ -0,0 +1,200 @@
+:root {
+ --bg: #f4efe6;
+ --panel: #fffaf2;
+ --ink: #1f2520;
+ --muted: #667267;
+ --line: #dccfb8;
+ --accent: #194c3d;
+ --accent-soft: #d8ebe3;
+ --danger: #a43f2f;
+ --danger-soft: #f3d6cf;
+ --warning: #a16a1d;
+ --warning-soft: #f7e7bf;
+ --info: #255a8a;
+ --info-soft: #dbe8f4;
+ --neutral-soft: #e5dfd1;
+}
+
+* {
+ box-sizing: border-box;
+}
+
+body {
+ margin: 0;
+ background:
+ radial-gradient(circle at top left, rgba(25, 76, 61, 0.08), transparent 30%),
+ linear-gradient(180deg, #f8f3ea 0%, var(--bg) 100%);
+ color: var(--ink);
+ font-family: Georgia, "Times New Roman", serif;
+}
+
+.page-shell {
+ max-width: 1200px;
+ margin: 0 auto;
+ padding: 40px 20px 56px;
+}
+
+.hero h1 {
+ margin: 8px 0;
+ font-size: clamp(2.4rem, 5vw, 4.2rem);
+ line-height: 0.95;
+}
+
+.eyebrow {
+ margin: 0;
+ color: var(--accent);
+ text-transform: uppercase;
+ letter-spacing: 0.14em;
+ font-size: 0.82rem;
+}
+
+.subtitle {
+ max-width: 720px;
+ color: var(--muted);
+ font-size: 1.05rem;
+}
+
+.toolbar {
+ display: grid;
+ grid-template-columns: repeat(4, minmax(0, 1fr));
+ gap: 12px;
+ margin-top: 28px;
+ padding: 16px;
+ border: 1px solid var(--line);
+ border-radius: 20px;
+ background: rgba(255, 250, 242, 0.9);
+ backdrop-filter: blur(10px);
+}
+
+.field {
+ display: flex;
+ flex-direction: column;
+ gap: 8px;
+ font-size: 0.92rem;
+ color: var(--muted);
+}
+
+.field input,
+.field select,
+.refresh-button {
+ height: 44px;
+ border: 1px solid var(--line);
+ border-radius: 12px;
+ background: #fffdf8;
+ color: var(--ink);
+ font: inherit;
+ padding: 0 14px;
+}
+
+.refresh-button {
+ align-self: end;
+ background: var(--accent);
+ color: #f4efe6;
+ border: none;
+ cursor: pointer;
+}
+
+.status-row {
+ margin: 18px 2px 0;
+ color: var(--muted);
+}
+
+.batch-grid {
+ display: grid;
+ grid-template-columns: repeat(3, minmax(0, 1fr));
+ gap: 16px;
+ margin-top: 18px;
+}
+
+.batch-card,
+.empty-state {
+ padding: 18px;
+ border: 1px solid var(--line);
+ border-radius: 22px;
+ background: var(--panel);
+ box-shadow: 0 12px 32px rgba(31, 37, 32, 0.06);
+}
+
+.batch-card:hover {
+ transform: translateY(-2px);
+ transition: transform 160ms ease;
+}
+
+.card-top {
+ display: flex;
+ justify-content: space-between;
+ gap: 12px;
+}
+
+.card-top h2 {
+ margin: 0;
+ font-size: 1.3rem;
+}
+
+.batch-id {
+ margin: 6px 0 0;
+ color: var(--muted);
+ font-size: 0.82rem;
+ word-break: break-all;
+}
+
+.status-badge {
+ align-self: flex-start;
+ padding: 6px 10px;
+ border-radius: 999px;
+ font-size: 0.8rem;
+ text-transform: capitalize;
+ white-space: nowrap;
+}
+
+.status-success { background: var(--accent-soft); color: var(--accent); }
+.status-failed, .status-partially_failed { background: var(--danger-soft); color: var(--danger); }
+.status-running { background: var(--info-soft); color: var(--info); }
+.status-needs_review { background: var(--warning-soft); color: var(--warning); }
+.status-pending, .status-cancelled { background: var(--neutral-soft); color: #5d655f; }
+
+.metrics {
+ display: grid;
+ grid-template-columns: repeat(3, minmax(0, 1fr));
+ gap: 12px;
+ margin: 18px 0;
+}
+
+.metrics div {
+ padding: 10px 12px;
+ border-radius: 14px;
+ background: #fff;
+}
+
+.metrics dt {
+ color: var(--muted);
+ font-size: 0.78rem;
+}
+
+.metrics dd {
+ margin: 6px 0 0;
+ font-size: 1.1rem;
+}
+
+.timestamps {
+ color: var(--muted);
+ font-size: 0.88rem;
+}
+
+@media (max-width: 960px) {
+ .toolbar {
+ grid-template-columns: 1fr 1fr;
+ }
+
+ .batch-grid {
+ grid-template-columns: 1fr 1fr;
+ }
+}
+
+@media (max-width: 640px) {
+ .toolbar,
+ .batch-grid,
+ .metrics {
+ grid-template-columns: 1fr;
+ }
+}
diff --git a/src/apps/worker/builtin_agents.py b/src/apps/worker/builtin_agents.py
index d04cb57..5a79043 100644
--- a/src/apps/worker/builtin_agents.py
+++ b/src/apps/worker/builtin_agents.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+from dataclasses import is_dataclass
from typing import Any, Protocol
from src.packages.core.db.models import TaskORM
@@ -10,6 +11,23 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
...
+def _serialize_context(context: Any) -> dict[str, Any]:
+ if isinstance(context, dict):
+ return context
+
+ if is_dataclass(context):
+ return {
+ "run_id": getattr(context, "run_id", None),
+ "task_id": getattr(context, "task_id", None),
+ "agent_role_name": getattr(context, "agent_role_name", None),
+ "started_at": getattr(context, "started_at", None).isoformat()
+ if getattr(context, "started_at", None) is not None
+ else None,
+ }
+
+ return {"value": str(context)}
+
+
class EchoWorkerAgent:
def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
return {
@@ -17,7 +35,7 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
"task_id": task.id,
"task_type": task.task_type,
"echo": task.input_payload,
- "context": context,
+ "context": _serialize_context(context),
}
@@ -38,7 +56,47 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
"normalized_text": text,
"tags": tags,
"steps": steps,
- "context": context,
+ "context": _serialize_context(context),
+ }
+
+
+class SearchWorkerAgent:
+ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
+ query = str(task.input_payload.get("query", task.input_payload.get("text", ""))).strip()
+ keywords = [part for part in query.split(" ") if part]
+ return {
+ "status": "ok",
+ "stage": "search",
+ "task_id": task.id,
+ "query": query,
+ "search_plan": {
+ "keywords": keywords,
+ "sources": ["web_search", "docs_index"],
+ "intent": "research",
+ },
+ "context": _serialize_context(context),
+ }
+
+
+class CodeWorkerAgent:
+ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
+ prompt = str(task.input_payload.get("prompt", task.input_payload.get("text", ""))).strip()
+ language = str(task.input_payload.get("language", "python")).strip() or "python"
+ summary = prompt if prompt else f"implement task_type={task.task_type}"
+ return {
+ "status": "ok",
+ "stage": "code",
+ "task_id": task.id,
+ "code_plan": {
+ "language": language,
+ "summary": summary,
+ "steps": [
+ "inspect existing code",
+ "apply minimal change",
+ "run targeted tests",
+ ],
+ },
+ "context": _serialize_context(context),
}
@@ -52,7 +110,7 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
"summary": f"processed task_type={task.task_type}",
"input": task.input_payload,
},
- "context": context,
+ "context": _serialize_context(context),
}
@@ -70,5 +128,5 @@ def run(self, task: TaskORM, context: dict[str, Any]) -> dict[str, Any]:
"notes": "manual review required due to failed validation"
if needs_manual_review
else "auto review passed",
- "context": context,
+ "context": _serialize_context(context),
}
diff --git a/src/apps/worker/registry.py b/src/apps/worker/registry.py
index 8aefc2e..53dab1b 100644
--- a/src/apps/worker/registry.py
+++ b/src/apps/worker/registry.py
@@ -5,11 +5,13 @@
from src.packages.core.db.models import TaskORM
from src.apps.worker.builtin_agents import (
+ CodeWorkerAgent,
DefaultWorkerAgent as BuiltinDefaultWorkerAgent,
EchoWorkerAgent,
FailingWorkerAgent,
PlannerWorkerAgent,
ReviewerWorkerAgent,
+ SearchWorkerAgent,
WorkerAgent,
)
@@ -47,12 +49,21 @@ def get(self, role_name: str) -> AgentRunner | None:
def build_default_registry() -> AgentRegistry:
registry = AgentRegistry()
registry.register("default_worker", DefaultWorkerAgent())
+ registry.register("search_agent", SearchWorkerAgent())
+ registry.register("code_agent", CodeWorkerAgent())
+ registry.register("planner_agent", PlannerWorkerAgent())
+ registry.register("worker_agent", BuiltinDefaultWorkerAgent())
+ registry.register("reviewer_agent", ReviewerWorkerAgent())
+ registry.register("echo_worker", EchoWorkerAgent())
+ registry.register("failing_worker", FailingWorkerAgent())
return registry
def get_worker_agent(role_name: str) -> WorkerAgent:
agents: dict[str, WorkerAgent] = {
"default_worker": EchoWorkerAgent(),
+ "search_agent": SearchWorkerAgent(),
+ "code_agent": CodeWorkerAgent(),
"echo_worker": EchoWorkerAgent(),
"failing_worker": FailingWorkerAgent(),
"planner_agent": PlannerWorkerAgent(),
diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py
index 8023000..89efe7b 100644
--- a/src/packages/core/schemas.py
+++ b/src/packages/core/schemas.py
@@ -123,6 +123,24 @@ class TaskBatchSummaryRead(SchemaModel):
artifacts: list[BatchArtifactRead]
+class TaskBatchListItemRead(SchemaModel):
+ batch_id: str
+ title: str
+ created_at: datetime
+ updated_at: datetime
+ total_tasks: int
+ derived_status: str
+ success_rate: float
+ completed_count: int
+ success_count: int
+ failed_count: int
+ cancelled_count: int
+
+
+class TaskBatchListResponse(SchemaModel):
+ items: list[TaskBatchListItemRead] = Field(default_factory=list)
+
+
class TaskCreate(SchemaModel):
batch_id: str
title: str
diff --git a/src/tests/test_builtin_demo_chain.py b/src/tests/test_builtin_demo_chain.py
index defc804..1f9d7f4 100644
--- a/src/tests/test_builtin_demo_chain.py
+++ b/src/tests/test_builtin_demo_chain.py
@@ -34,10 +34,7 @@ def _database_url() -> str:
def _cleanup_database() -> None:
engine = create_engine(_database_url())
with engine.begin() as conn:
- conn.execute(
- text("DELETE FROM task_batches WHERE title LIKE :prefix"),
- {"prefix": f"{DEMO_PREFIX}%"},
- )
+ conn.execute(text("DELETE FROM task_batches"))
def _demo_payload(suffix: str) -> dict:
@@ -81,10 +78,13 @@ def _demo_payload(suffix: str) -> dict:
_cleanup_database()
from src.apps.api.app import app # noqa: E402
-from src.apps.worker.service import WorkerService # noqa: E402
+from src.apps.api.bootstrap import ensure_builtin_agent_roles # noqa: E402
+from src.apps.worker.executor import run_next_task # noqa: E402
+from src.apps.worker.registry import build_default_registry # noqa: E402
def setup_function() -> None:
_cleanup_database()
+ ensure_builtin_agent_roles()
def teardown_function() -> None:
@@ -104,10 +104,18 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None:
reviewer_count = conn.execute(
text("SELECT count(*) FROM agent_roles WHERE role_name = 'reviewer_agent'")
).scalar_one()
+ search_count = conn.execute(
+ text("SELECT count(*) FROM agent_roles WHERE role_name = 'search_agent'")
+ ).scalar_one()
+ code_count = conn.execute(
+ text("SELECT count(*) FROM agent_roles WHERE role_name = 'code_agent'")
+ ).scalar_one()
assert planner_count == 1
assert worker_count == 1
assert reviewer_count == 1
+ assert search_count == 1
+ assert code_count == 1
suffix = uuid.uuid4().hex[:8]
create_response = client.post("/task-batches", json=_demo_payload(suffix))
@@ -116,10 +124,10 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None:
created_ids = [task["task_id"] for task in created["tasks"]]
with Session(engine) as session:
- worker = WorkerService(session)
- first_run = worker.run_once()
- second_run = worker.run_once()
- third_run = worker.run_once()
+ registry = build_default_registry()
+ first_run = run_next_task(session, registry)
+ second_run = run_next_task(session, registry)
+ third_run = run_next_task(session, registry)
assert first_run is not None
assert second_run is not None
assert third_run is not None
@@ -158,4 +166,81 @@ def test_builtin_roles_seeded_once_and_demo_chain_runs() -> None:
for event in events_response.json()
if event["event_type"] == "task_status_changed"
]
- assert statuses == ["queued", "running", "success"]
+ assert statuses[-2:] == ["running", "success"]
+ assert statuses[0] in {"queued", "blocked"}
+
+
+def test_builtin_search_and_code_roles_execute_distinct_outputs() -> None:
+ with TestClient(app) as client:
+ engine = create_engine(_database_url())
+ suffix = uuid.uuid4().hex[:8]
+ payload = {
+ "title": f"{DEMO_PREFIX}search-code-{suffix}",
+ "description": "built-in search and code demo",
+ "created_by": "pytest",
+ "metadata": {"suite": "builtin-search-code"},
+ "tasks": [
+ {
+ "client_task_id": "task_1",
+ "title": f"{DEMO_PREFIX}search-{suffix}",
+ "task_type": "research_topic",
+ "priority": "medium",
+ "input_payload": {"query": "python worker patterns"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_2",
+ "title": f"{DEMO_PREFIX}code-{suffix}",
+ "task_type": "implement_feature",
+ "priority": "medium",
+ "input_payload": {"prompt": "add worker retries", "language": "python"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_3",
+ "title": f"{DEMO_PREFIX}filler-{suffix}",
+ "task_type": "research_topic",
+ "priority": "medium",
+ "input_payload": {"query": "queue locking strategy"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ ],
+ }
+
+ create_response = client.post("/task-batches", json=payload)
+ assert create_response.status_code == 201
+ created = create_response.json()["tasks"]
+ assignments = {task["title"]: task["assigned_agent_role"] for task in created}
+ assert assignments[f"{DEMO_PREFIX}search-{suffix}"] == "search_agent"
+ assert assignments[f"{DEMO_PREFIX}code-{suffix}"] == "code_agent"
+
+ with Session(engine) as session:
+ registry = build_default_registry()
+ first_run = run_next_task(session, registry)
+ second_run = run_next_task(session, registry)
+ third_run = run_next_task(session, registry)
+ assert first_run is not None
+ assert second_run is not None
+ assert third_run is not None
+ run_ids = [first_run.id, second_run.id, third_run.id]
+
+ run_payloads = []
+ for run_id in run_ids:
+ run_response = client.get(f"/runs/{run_id}")
+ assert run_response.status_code == 200
+ run_payloads.append(run_response.json())
+
+ stages = {payload["output_snapshot"].get("stage") for payload in run_payloads}
+ assert "search" in stages
+ assert "code" in stages
+ assert any(
+ payload["output_snapshot"].get("search_plan", {}).get("intent") == "research"
+ for payload in run_payloads
+ )
+ assert any(
+ payload["output_snapshot"].get("code_plan", {}).get("language") == "python"
+ for payload in run_payloads
+ )
diff --git a/src/tests/test_task_batch_list.py b/src/tests/test_task_batch_list.py
new file mode 100644
index 0000000..a0bf433
--- /dev/null
+++ b/src/tests/test_task_batch_list.py
@@ -0,0 +1,180 @@
+from __future__ import annotations
+
+import os
+import sys
+import uuid
+from pathlib import Path
+
+from fastapi.testclient import TestClient
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+
+ROOT = Path(__file__).resolve().parents[2]
+TEST_PREFIX = "batch-list-test-"
+
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+
+def _database_url() -> str:
+ database_url = os.getenv("DATABASE_URL")
+ if not database_url:
+ raise RuntimeError("DATABASE_URL is not set")
+ return database_url
+
+
+def _cleanup_database() -> None:
+ engine = create_engine(_database_url())
+ with engine.begin() as conn:
+ conn.execute(text("DELETE FROM task_batches"))
+ conn.execute(text("DELETE FROM agent_roles"))
+
+
+def _register_agent(client: TestClient, *, role_name: str, supported_task_types: list[str]) -> None:
+ payload = {
+ "role_name": role_name,
+ "description": "batch list role",
+ "capabilities": [f"task:{task_type}" for task_type in supported_task_types] or ["default_worker"],
+ "capability_declaration": {
+ "supported_task_types": supported_task_types,
+ "input_requirements": {"properties": {"text": {"type": "string"}}},
+ "output_contract": {"type": "object"},
+ "supports_concurrency": True,
+ "allows_auto_retry": False,
+ },
+ "input_schema": {},
+ "output_schema": {},
+ "timeout_seconds": 300,
+ "max_retries": 0,
+ "enabled": True,
+ "version": "1.0.0",
+ }
+ response = client.post("/agents/register", json=payload)
+ assert response.status_code in {201, 400}
+
+
+def _batch_payload(title: str, task_type: str) -> dict:
+ return {
+ "title": title,
+ "description": "batch list batch",
+ "created_by": "pytest",
+ "metadata": {"suite": "batch-list"},
+ "tasks": [
+ {
+ "client_task_id": "task_1",
+ "title": f"{title}-1",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "alpha"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_2",
+ "title": f"{title}-2",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "beta"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ {
+ "client_task_id": "task_3",
+ "title": f"{title}-3",
+ "task_type": task_type,
+ "priority": "medium",
+ "input_payload": {"text": "gamma"},
+ "expected_output_schema": {"type": "object"},
+ "dependency_client_task_ids": [],
+ },
+ ],
+ }
+
+
+_cleanup_database()
+
+from src.apps.api.app import app # noqa: E402
+
+
+client = TestClient(app)
+
+
+def setup_function() -> None:
+ _cleanup_database()
+
+
+def teardown_function() -> None:
+ _cleanup_database()
+
+
+def test_list_task_batches_returns_summary_fields() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ _register_agent(client, role_name=f"{TEST_PREFIX}generate-{suffix}", supported_task_types=["generate"])
+
+ response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}alpha-{suffix}", "generate"))
+ assert response.status_code == 201
+
+ list_response = client.get("/task-batches")
+ assert list_response.status_code == 200
+ items = list_response.json()["items"]
+ assert len(items) == 1
+ item = items[0]
+ assert item["title"] == f"{TEST_PREFIX}alpha-{suffix}"
+ assert item["total_tasks"] == 3
+ assert item["derived_status"] == "pending"
+ assert item["success_rate"] == 0.0
+ assert "updated_at" in item
+
+
+def test_list_task_batches_filters_by_derived_status() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ _register_agent(client, role_name="default_worker", supported_task_types=[])
+
+ pending_response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}pending-{suffix}", "generate"))
+ assert pending_response.status_code == 201
+
+ pending_list = client.get("/task-batches", params={"status": "pending"})
+ assert pending_list.status_code == 200
+ assert all(item["derived_status"] == "pending" for item in pending_list.json()["items"])
+
+ _cleanup_database()
+ review_response = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}review-{suffix}", "no_match"))
+ assert review_response.status_code == 201
+
+ review_list = client.get("/task-batches", params={"status": "needs_review"})
+ assert review_list.status_code == 200
+ assert len(review_list.json()["items"]) == 1
+ assert review_list.json()["items"][0]["title"] == f"{TEST_PREFIX}review-{suffix}"
+
+
+def test_list_task_batches_supports_search_and_sort() -> None:
+ suffix = uuid.uuid4().hex[:8]
+ _register_agent(client, role_name=f"{TEST_PREFIX}generate-{suffix}", supported_task_types=["generate"])
+
+ first = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}zeta-{suffix}", "generate"))
+ assert first.status_code == 201
+ second = client.post("/task-batches", json=_batch_payload(f"{TEST_PREFIX}alpha-{suffix}", "generate"))
+ assert second.status_code == 201
+
+ search_response = client.get("/task-batches", params={"search": "alpha"})
+ assert search_response.status_code == 200
+ assert len(search_response.json()["items"]) == 1
+ assert search_response.json()["items"][0]["title"] == f"{TEST_PREFIX}alpha-{suffix}"
+
+ sort_response = client.get("/task-batches", params={"sort": "created_at_asc"})
+ assert sort_response.status_code == 200
+ titles = [item["title"] for item in sort_response.json()["items"]]
+ assert titles == [f"{TEST_PREFIX}zeta-{suffix}", f"{TEST_PREFIX}alpha-{suffix}"]
+
+
+def test_list_task_batches_returns_empty_list_when_no_batches_exist() -> None:
+ response = client.get("/task-batches")
+ assert response.status_code == 200
+ assert response.json() == {"items": []}
+
+
+def test_console_batches_page_is_accessible() -> None:
+ response = client.get("/console/batches")
+ assert response.status_code == 200
+ assert "Batch Console" in response.text
diff --git a/src/tests/test_task_routing.py b/src/tests/test_task_routing.py
index 854dd0b..dc01ab0 100644
--- a/src/tests/test_task_routing.py
+++ b/src/tests/test_task_routing.py
@@ -123,6 +123,7 @@ def _register_agent(
_cleanup_database()
from src.apps.api.app import app # noqa: E402
+from src.apps.api.bootstrap import ensure_builtin_agent_roles # noqa: E402
client = TestClient(app)
@@ -130,6 +131,7 @@ def _register_agent(
def setup_function() -> None:
_cleanup_database()
+ ensure_builtin_agent_roles()
def teardown_function() -> None:
@@ -171,6 +173,22 @@ def test_routes_tasks_by_capability_when_task_type_not_declared() -> None:
assert all(task["routing_reason"] == "matched by capability=task:write_summary" for task in tasks)
+def test_routes_builtin_search_and_code_roles_by_capability() -> None:
+ suffix = uuid.uuid4().hex[:8]
+
+ search_response = client.post("/task-batches", json=_batch_payload("research_topic", f"{suffix}-search"))
+ assert search_response.status_code == 201
+ search_tasks = search_response.json()["tasks"]
+ assert all(task["assigned_agent_role"] == "search_agent" for task in search_tasks)
+ assert all(task["routing_reason"] == "matched by capability=task:research_topic" for task in search_tasks)
+
+ code_response = client.post("/task-batches", json=_batch_payload("implement_feature", f"{suffix}-code"))
+ assert code_response.status_code == 201
+ code_tasks = code_response.json()["tasks"]
+ assert all(task["assigned_agent_role"] == "code_agent" for task in code_tasks)
+ assert all(task["routing_reason"] == "matched by capability=task:implement_feature" for task in code_tasks)
+
+
def test_routes_tasks_to_default_worker_as_fallback() -> None:
suffix = uuid.uuid4().hex[:8]
role = _register_agent(