diff --git a/alembic/versions/20260410_000003_expand_review_checkpoints.py b/alembic/versions/20260410_000003_expand_review_checkpoints.py new file mode 100644 index 0000000..3e7482c --- /dev/null +++ b/alembic/versions/20260410_000003_expand_review_checkpoints.py @@ -0,0 +1,52 @@ +"""expand review checkpoints + +Revision ID: 20260410_000003 +Revises: 20260330_000002 +Create Date: 2026-04-10 16:25:00 +""" + +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa + + +revision = "20260410_000003" +down_revision = "20260330_000002" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "review_checkpoints", + sa.Column("reason_category", sa.String(length=64), nullable=False, server_default="other"), + ) + op.add_column( + "review_checkpoints", + sa.Column("timeout_policy", sa.String(length=32), nullable=False, server_default="fail_closed"), + ) + op.add_column( + "review_checkpoints", + sa.Column("deadline_at", sa.DateTime(timezone=True), nullable=True), + ) + op.create_index( + "ix_review_checkpoints_reason_category", + "review_checkpoints", + ["reason_category"], + unique=False, + ) + op.create_index( + "ix_review_checkpoints_deadline_at", + "review_checkpoints", + ["deadline_at"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index("ix_review_checkpoints_deadline_at", table_name="review_checkpoints") + op.drop_index("ix_review_checkpoints_reason_category", table_name="review_checkpoints") + op.drop_column("review_checkpoints", "deadline_at") + op.drop_column("review_checkpoints", "timeout_policy") + op.drop_column("review_checkpoints", "reason_category") diff --git a/src/apps/api/routers/reviews.py b/src/apps/api/routers/reviews.py index 175f16e..813a13c 100644 --- a/src/apps/api/routers/reviews.py +++ b/src/apps/api/routers/reviews.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import func, select @@ -9,9 +9,17 @@ from src.apps.api.deps import get_db from src.packages.core.db.models import AgentRoleORM, AssignmentORM, EventLogORM, ReviewCheckpointORM, TaskORM from src.packages.core.schemas import ( + BulkReviewApproveRequest, + BulkReviewDecisionResponse, + BulkReviewItemResult, + BulkReviewReassignRequest, + BulkReviewRejectRequest, ReviewCheckpointRead, ReviewDecisionApproveRequest, + ReviewDecisionReassignRequest, ReviewDecisionRejectRequest, + ReviewTimeoutProcessRequest, + ReviewTimeoutProcessResponse, TaskRead, ) from src.packages.core.task_state_machine import TaskStatusTransitionError, transition_task_status @@ -23,6 +31,10 @@ def _now() -> datetime: return datetime.now(timezone.utc) +def _next_review_deadline() -> datetime: + return _now().replace(microsecond=0) + timedelta(minutes=30) + + def _dependencies_satisfied(task: TaskORM, db: Session) -> bool: if not task.dependency_ids: return True @@ -55,66 +67,51 @@ def _validate_review_pending(review: ReviewCheckpointORM, task: TaskORM) -> None ) -@router.get("/tasks/{task_id}/reviews", response_model=list[ReviewCheckpointRead]) -def list_task_reviews(task_id: str, db: Session = Depends(get_db)) -> list[ReviewCheckpointRead]: - task = db.get(TaskORM, task_id) - if task is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") - - reviews = db.scalars( - select(ReviewCheckpointORM) - .where(ReviewCheckpointORM.task_id == task_id) - .order_by(ReviewCheckpointORM.created_at.asc(), ReviewCheckpointORM.id.asc()) - ).all() - return [ReviewCheckpointRead.model_validate(review) for review in reviews] - - -@router.get("/reviews/{review_id}", response_model=ReviewCheckpointRead) -def get_review(review_id: str, db: Session = Depends(get_db)) -> ReviewCheckpointRead: - review = _get_review_or_404(db, review_id) - return ReviewCheckpointRead.model_validate(review) - - -@router.post("/reviews/{review_id}/approve", response_model=TaskRead) -def approve_review( - review_id: str, - payload: ReviewDecisionApproveRequest, - db: Session = Depends(get_db), -) -> TaskRead: - review = _get_review_or_404(db, review_id) - task = db.get(TaskORM, review.task_id) - if task is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") - _validate_review_pending(review, task) - - agent_role = db.get(AgentRoleORM, payload.agent_role_id) +def _get_enabled_role(db: Session, agent_role_id: str) -> AgentRoleORM: + agent_role = db.get(AgentRoleORM, agent_role_id) if agent_role is None or not agent_role.enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Approved review requires an enabled agent role", + detail="Review decision requires an enabled agent role", ) + return agent_role - review.review_status = "approved" - review.reviewer = payload.reviewer - review.review_comment = payload.review_comment - review.resolved_at = _now() +def _supersede_active_assignments(db: Session, task_id: str) -> None: + active_assignments = db.scalars( + select(AssignmentORM) + .where( + AssignmentORM.task_id == task_id, + AssignmentORM.assignment_status == "active", + ) + .order_by(AssignmentORM.assigned_at.desc(), AssignmentORM.id.desc()) + ).all() + for assignment in active_assignments: + assignment.assignment_status = "superseded" + + +def _apply_assignment( + db: Session, + task: TaskORM, + *, + agent_role: AgentRoleORM, + routing_reason: str, +) -> str: + _supersede_active_assignments(db, task.id) task.assigned_agent_role = agent_role.role_name - assignment = AssignmentORM( - task_id=task.id, - agent_role_id=agent_role.id, - routing_reason=f"manually approved by {payload.reviewer}", - assignment_status="active", + db.add( + AssignmentORM( + task_id=task.id, + agent_role_id=agent_role.id, + routing_reason=routing_reason, + assignment_status="active", + ) ) - db.add(assignment) next_status = "queued" if _dependencies_satisfied(task, db) else "blocked" next_reason = ( - f"review approved by {payload.reviewer}; task queued for execution" - if next_status == "queued" - else f"review approved by {payload.reviewer}; waiting for dependencies to complete" + routing_reason if next_status == "queued" else f"{routing_reason}; waiting for dependencies to complete" ) - try: transition_task_status( db, @@ -125,45 +122,461 @@ def approve_review( ) except TaskStatusTransitionError as exc: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc - + return next_status + + +def _log_review_event( + db: Session, + *, + task: TaskORM, + event_type: str, + event_status: str, + message: str | None, + payload: dict, +) -> None: db.add( EventLogORM( batch_id=task.batch_id, task_id=task.id, - event_type="review_approved", - event_status=task.status, - message=payload.review_comment or review.reason, - payload={ - "task_id": task.id, - "review_id": review.id, - "reviewer": payload.reviewer, - "review_comment": payload.review_comment, - "agent_role_id": agent_role.id, - "agent_role_name": agent_role.role_name, - "resolved_at": review.resolved_at.isoformat(), - "next_status": next_status, - "source": "review", - }, + event_type=event_type, + event_status=event_status, + message=message, + payload=payload, ) ) - db.add( - EventLogORM( - batch_id=task.batch_id, - task_id=task.id, - event_type="task_review_resolved", - event_status=task.status, - message="review approved", - payload={ - "task_id": task.id, - "review_id": review.id, - "decision": "approved", - "reviewer": payload.reviewer, - "resolved_at": review.resolved_at.isoformat(), - "source": "review", - }, + + +def _resolve_review( + review: ReviewCheckpointORM, + *, + reviewer: str, + review_comment: str | None, + review_status: str, +) -> None: + review.review_status = review_status + review.reviewer = reviewer + review.review_comment = review_comment + review.resolved_at = _now() + + +def _approve_task( + db: Session, + *, + review: ReviewCheckpointORM, + task: TaskORM, + reviewer: str, + review_comment: str | None, + agent_role: AgentRoleORM, + event_type: str, + resolution_message: str, +) -> TaskORM: + _validate_review_pending(review, task) + _resolve_review( + review, + reviewer=reviewer, + review_comment=review_comment, + review_status="approved", + ) + next_status = _apply_assignment( + db, + task, + agent_role=agent_role, + routing_reason=resolution_message, + ) + _log_review_event( + db, + task=task, + event_type=event_type, + event_status=task.status, + message=review_comment or review.reason, + payload={ + "task_id": task.id, + "review_id": review.id, + "reviewer": reviewer, + "review_comment": review_comment, + "agent_role_id": agent_role.id, + "agent_role_name": agent_role.role_name, + "reason_category": review.reason_category, + "timeout_policy": review.timeout_policy, + "resolved_at": review.resolved_at.isoformat(), + "next_status": next_status, + "source": "review", + }, + ) + _log_review_event( + db, + task=task, + event_type="task_review_resolved", + event_status=task.status, + message="review approved" if event_type == "review_approved" else "review reassigned", + payload={ + "task_id": task.id, + "review_id": review.id, + "decision": "approved" if event_type == "review_approved" else "reassigned", + "reviewer": reviewer, + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, + ) + return task + + +def _reject_task( + db: Session, + *, + review: ReviewCheckpointORM, + task: TaskORM, + reviewer: str, + review_comment: str, + failure_status: str = "failed", + event_type: str = "review_rejected", +) -> TaskORM: + _validate_review_pending(review, task) + _resolve_review( + review, + reviewer=reviewer, + review_comment=review_comment, + review_status="rejected", + ) + try: + transition_task_status( + db, + task, + to_status=failure_status, + reason=f"review rejected by {reviewer}", + source="review", ) + except TaskStatusTransitionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + _log_review_event( + db, + task=task, + event_type=event_type, + event_status=task.status, + message=review_comment, + payload={ + "task_id": task.id, + "review_id": review.id, + "reviewer": reviewer, + "review_comment": review_comment, + "reason_category": review.reason_category, + "timeout_policy": review.timeout_policy, + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, + ) + _log_review_event( + db, + task=task, + event_type="task_review_resolved", + event_status=task.status, + message="review rejected", + payload={ + "task_id": task.id, + "review_id": review.id, + "decision": "rejected", + "reviewer": reviewer, + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, ) + return task + + +def _reassign_task( + db: Session, + *, + review: ReviewCheckpointORM, + task: TaskORM, + reviewer: str, + review_comment: str | None, + agent_role: AgentRoleORM, +) -> TaskORM: + return _approve_task( + db, + review=review, + task=task, + reviewer=reviewer, + review_comment=review_comment, + agent_role=agent_role, + event_type="review_reassigned", + resolution_message=f"review reassigned by {reviewer}", + ) + + +def _review_result(task: TaskORM, review_id: str, detail: str | None = None) -> BulkReviewItemResult: + return BulkReviewItemResult( + review_id=review_id, + ok=True, + task_id=task.id, + status=task.status, + assigned_agent_role=task.assigned_agent_role, + detail=detail, + ) + + +def _error_result(review_id: str, detail: str) -> BulkReviewItemResult: + return BulkReviewItemResult(review_id=review_id, ok=False, detail=detail) + +def _process_bulk( + db: Session, + review_ids: list[str], + handler, +) -> BulkReviewDecisionResponse: + items: list[BulkReviewItemResult] = [] + for review_id in review_ids: + try: + item = handler(review_id) + db.commit() + items.append(item) + except HTTPException as exc: + db.rollback() + items.append(_error_result(review_id, str(exc.detail))) + except Exception as exc: # pragma: no cover - defensive branch for API resilience + db.rollback() + items.append(_error_result(review_id, str(exc))) + return BulkReviewDecisionResponse(items=items) + + +@router.get("/tasks/{task_id}/reviews", response_model=list[ReviewCheckpointRead]) +def list_task_reviews(task_id: str, db: Session = Depends(get_db)) -> list[ReviewCheckpointRead]: + task = db.get(TaskORM, task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + + reviews = db.scalars( + select(ReviewCheckpointORM) + .where(ReviewCheckpointORM.task_id == task_id) + .order_by(ReviewCheckpointORM.created_at.asc(), ReviewCheckpointORM.id.asc()) + ).all() + return [ReviewCheckpointRead.model_validate(review) for review in reviews] + + +@router.post("/reviews/bulk/approve", response_model=BulkReviewDecisionResponse) +def bulk_approve_reviews( + payload: BulkReviewApproveRequest, + db: Session = Depends(get_db), +) -> BulkReviewDecisionResponse: + agent_role = _get_enabled_role(db, payload.agent_role_id) + + def handler(review_id: str) -> BulkReviewItemResult: + review = _get_review_or_404(db, review_id) + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + updated_task = _approve_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + agent_role=agent_role, + event_type="review_approved", + resolution_message=f"review approved by {payload.reviewer}", + ) + return _review_result(updated_task, review_id, "approved") + + return _process_bulk(db, payload.review_ids, handler) + + +@router.post("/reviews/bulk/reject", response_model=BulkReviewDecisionResponse) +def bulk_reject_reviews( + payload: BulkReviewRejectRequest, + db: Session = Depends(get_db), +) -> BulkReviewDecisionResponse: + def handler(review_id: str) -> BulkReviewItemResult: + review = _get_review_or_404(db, review_id) + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + updated_task = _reject_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + ) + return _review_result(updated_task, review_id, "rejected") + + return _process_bulk(db, payload.review_ids, handler) + + +@router.post("/reviews/bulk/reassign", response_model=BulkReviewDecisionResponse) +def bulk_reassign_reviews( + payload: BulkReviewReassignRequest, + db: Session = Depends(get_db), +) -> BulkReviewDecisionResponse: + agent_role = _get_enabled_role(db, payload.agent_role_id) + + def handler(review_id: str) -> BulkReviewItemResult: + review = _get_review_or_404(db, review_id) + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + updated_task = _reassign_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + agent_role=agent_role, + ) + return _review_result(updated_task, review_id, "reassigned") + + return _process_bulk(db, payload.review_ids, handler) + + +@router.post("/reviews/process-timeouts", response_model=ReviewTimeoutProcessResponse) +def process_review_timeouts( + payload: ReviewTimeoutProcessRequest, + db: Session = Depends(get_db), +) -> ReviewTimeoutProcessResponse: + now = _now() + reviews = db.scalars( + select(ReviewCheckpointORM) + .where( + ReviewCheckpointORM.review_status == "pending", + ReviewCheckpointORM.deadline_at.is_not(None), + ReviewCheckpointORM.deadline_at <= now, + ) + .order_by(ReviewCheckpointORM.deadline_at.asc(), ReviewCheckpointORM.id.asc()) + .limit(payload.limit) + ).all() + + items: list[BulkReviewItemResult] = [] + for review in reviews: + try: + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + _validate_review_pending(review, task) + if review.timeout_policy == "cancel_task": + updated_task = _reject_task( + db, + review=review, + task=task, + reviewer="system", + review_comment="review timeout policy applied: cancel_task", + failure_status="cancelled", + event_type="review_timeout_processed", + ) + items.append(_review_result(updated_task, review.id, "timeout_cancelled")) + elif review.timeout_policy == "escalate": + _resolve_review( + review, + reviewer="system", + review_comment="review timeout policy applied: escalate", + review_status="rejected", + ) + db.add( + ReviewCheckpointORM( + task_id=task.id, + reason=f"review escalated after timeout: {review.reason}", + reason_category="manual_override", + timeout_policy="fail_closed", + review_status="pending", + deadline_at=_next_review_deadline(), + ) + ) + _log_review_event( + db, + task=task, + event_type="review_timeout_processed", + event_status=task.status, + message=review.review_comment, + payload={ + "task_id": task.id, + "review_id": review.id, + "policy": review.timeout_policy, + "reviewer": "system", + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, + ) + _log_review_event( + db, + task=task, + event_type="review_timeout_escalated", + event_status=task.status, + message=review.review_comment, + payload={ + "task_id": task.id, + "review_id": review.id, + "policy": review.timeout_policy, + "reviewer": "system", + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, + ) + _log_review_event( + db, + task=task, + event_type="task_review_resolved", + event_status=task.status, + message="review timed out and escalated", + payload={ + "task_id": task.id, + "review_id": review.id, + "decision": "timed_out_escalated", + "reviewer": "system", + "resolved_at": review.resolved_at.isoformat(), + "source": "review", + }, + ) + items.append(_review_result(task, review.id, "timeout_escalated")) + else: + updated_task = _reject_task( + db, + review=review, + task=task, + reviewer="system", + review_comment="review timeout policy applied: fail_closed", + failure_status="failed", + event_type="review_timeout_processed", + ) + items.append(_review_result(updated_task, review.id, "timeout_failed")) + db.commit() + except HTTPException as exc: + db.rollback() + items.append(_error_result(review.id, str(exc.detail))) + except Exception as exc: # pragma: no cover + db.rollback() + items.append(_error_result(review.id, str(exc))) + + return ReviewTimeoutProcessResponse( + processed_count=sum(1 for item in items if item.ok), + items=items, + ) + + +@router.get("/reviews/{review_id}", response_model=ReviewCheckpointRead) +def get_review(review_id: str, db: Session = Depends(get_db)) -> ReviewCheckpointRead: + review = _get_review_or_404(db, review_id) + return ReviewCheckpointRead.model_validate(review) + + +@router.post("/reviews/{review_id}/approve", response_model=TaskRead) +def approve_review( + review_id: str, + payload: ReviewDecisionApproveRequest, + db: Session = Depends(get_db), +) -> TaskRead: + review = _get_review_or_404(db, review_id) + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + agent_role = _get_enabled_role(db, payload.agent_role_id) + task = _approve_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + agent_role=agent_role, + event_type="review_approved", + resolution_message=f"review approved by {payload.reviewer}", + ) db.commit() db.refresh(task) return TaskRead.model_validate(task) @@ -179,59 +592,37 @@ def reject_review( task = db.get(TaskORM, review.task_id) if task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") - _validate_review_pending(review, task) - - review.review_status = "rejected" - review.reviewer = payload.reviewer - review.review_comment = payload.review_comment - review.resolved_at = _now() + task = _reject_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + ) + db.commit() + db.refresh(task) + return TaskRead.model_validate(task) - try: - transition_task_status( - db, - task, - to_status="failed", - reason=f"review rejected by {payload.reviewer}", - source="review", - ) - except TaskStatusTransitionError as exc: - raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc - db.add( - EventLogORM( - batch_id=task.batch_id, - task_id=task.id, - event_type="review_rejected", - event_status=task.status, - message=payload.review_comment, - payload={ - "task_id": task.id, - "review_id": review.id, - "reviewer": payload.reviewer, - "review_comment": payload.review_comment, - "resolved_at": review.resolved_at.isoformat(), - "source": "review", - }, - ) - ) - db.add( - EventLogORM( - batch_id=task.batch_id, - task_id=task.id, - event_type="task_review_resolved", - event_status=task.status, - message="review rejected", - payload={ - "task_id": task.id, - "review_id": review.id, - "decision": "rejected", - "reviewer": payload.reviewer, - "resolved_at": review.resolved_at.isoformat(), - "source": "review", - }, - ) +@router.post("/reviews/{review_id}/reassign", response_model=TaskRead) +def reassign_review( + review_id: str, + payload: ReviewDecisionReassignRequest, + db: Session = Depends(get_db), +) -> TaskRead: + review = _get_review_or_404(db, review_id) + task = db.get(TaskORM, review.task_id) + if task is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") + agent_role = _get_enabled_role(db, payload.agent_role_id) + task = _reassign_task( + db, + review=review, + task=task, + reviewer=payload.reviewer, + review_comment=payload.review_comment, + agent_role=agent_role, ) - db.commit() db.refresh(task) return TaskRead.model_validate(task) diff --git a/src/apps/api/routers/task_batches.py b/src/apps/api/routers/task_batches.py index 63e1ccd..dac87a2 100644 --- a/src/apps/api/routers/task_batches.py +++ b/src/apps/api/routers/task_batches.py @@ -2,7 +2,7 @@ from collections import deque -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select @@ -19,6 +19,7 @@ TaskBatchORM, TaskORM, ) +from src.packages.core.costs import estimate_cost from src.packages.core.error_classification import classify_task_error, summarize_failure_categories from src.packages.core.schemas import ( BatchTimelineRead, @@ -33,11 +34,14 @@ TaskBatchSummaryRead, TaskBatchSubmitRequest, TaskBatchSubmitResponse, + TaskBatchTaskCreate, TaskBatchSubmitTaskRead, + TaskNormalizationRead, ) +from src.packages.core.task_batch_normalization import normalize_batch_tasks from src.packages.core.timeline import load_batch_timeline from src.packages.core.task_state_machine import transition_task_status -from src.packages.router import route_task +from src.packages.router import RoleRoutingStats, route_task router = APIRouter(prefix="/task-batches", tags=["task-batches"]) @@ -46,6 +50,10 @@ def _now() -> datetime: return datetime.now(timezone.utc) +def _review_deadline() -> datetime: + return _now().replace(microsecond=0) + timedelta(minutes=30) + + def _build_batch_counts(tasks: list[TaskORM]) -> BatchCountsRead: counts = { "pending_count": 0, @@ -127,6 +135,48 @@ def _load_latest_runs(task_ids: list[str], db: Session) -> dict[str, ExecutionRu return latest_runs +def _load_role_routing_stats(db: Session) -> dict[str, RoleRoutingStats]: + runs = db.scalars( + select(ExecutionRunORM).order_by(ExecutionRunORM.started_at.asc(), ExecutionRunORM.id.asc()) + ).all() + if not runs: + return {} + + stats_by_role_id: dict[str, dict[str, float | int]] = {} + for run in runs: + stats = stats_by_role_id.setdefault( + run.agent_role_id, + { + "total_runs": 0, + "success_runs": 0, + "latency_sum": 0, + "latency_count": 0, + "cost_sum": 0.0, + }, + ) + stats["total_runs"] += 1 + if run.run_status == "success": + stats["success_runs"] += 1 + if run.latency_ms is not None: + stats["latency_sum"] += int(run.latency_ms) + stats["latency_count"] += 1 + stats["cost_sum"] += estimate_cost(run.token_usage) + + role_stats: dict[str, RoleRoutingStats] = {} + for role_id, raw in stats_by_role_id.items(): + latency_count = int(raw["latency_count"]) + average_latency_ms = round(raw["latency_sum"] / latency_count, 2) if latency_count else None + total_runs = int(raw["total_runs"]) + average_cost_estimate = round(raw["cost_sum"] / total_runs, 6) if total_runs else 0.0 + role_stats[role_id] = RoleRoutingStats( + total_runs=total_runs, + success_runs=int(raw["success_runs"]), + average_latency_ms=average_latency_ms, + average_cost_estimate=average_cost_estimate, + ) + return role_stats + + def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[ArtifactORM], dict[str, int]]: if not task_ids: return [], {} @@ -263,9 +313,18 @@ def create_task_batch( payload: TaskBatchSubmitRequest, db: Session = Depends(get_db), ) -> TaskBatchSubmitResponse: - _validate_unique_client_task_ids(payload) - _validate_dependencies_exist(payload) - _detect_cycle(payload) + normalized_tasks, normalization_items = normalize_batch_tasks( + [task.model_dump() for task in payload.tasks] + ) + normalized_payload = payload.model_copy( + update={ + "tasks": [TaskBatchTaskCreate.model_validate(task) for task in normalized_tasks], + } + ) + + _validate_unique_client_task_ids(normalized_payload) + _validate_dependencies_exist(normalized_payload) + _detect_cycle(normalized_payload) task_mapping: dict[str, TaskORM] = {} routing_results: dict[str, dict[str, str | bool | None | list[str]]] = {} @@ -273,17 +332,17 @@ def create_task_batch( try: with db.begin(): task_batch = TaskBatchORM( - title=payload.title, - description=payload.description, - created_by=payload.created_by, + title=normalized_payload.title, + description=normalized_payload.description, + created_by=normalized_payload.created_by, status="draft", - total_tasks=len(payload.tasks), - metadata_json=payload.metadata, + total_tasks=len(normalized_payload.tasks), + metadata_json=normalized_payload.metadata, ) db.add(task_batch) db.flush() - for submitted_task in payload.tasks: + for submitted_task in normalized_payload.tasks: task = TaskORM( batch_id=task_batch.id, title=submitted_task.title, @@ -301,7 +360,7 @@ def create_task_batch( db.flush() task_mapping[submitted_task.client_task_id] = task - for submitted_task in payload.tasks: + for submitted_task in normalized_payload.tasks: task = task_mapping[submitted_task.client_task_id] task.dependency_ids = [ task_mapping[dependency_id].id @@ -309,10 +368,11 @@ def create_task_batch( ] agent_roles = db.scalars(select(AgentRoleORM)).all() + role_routing_stats = _load_role_routing_stats(db) - for submitted_task in payload.tasks: + for submitted_task in normalized_payload.tasks: task = task_mapping[submitted_task.client_task_id] - route_result = route_task(task, list(agent_roles)) + route_result = route_task(task, list(agent_roles), role_routing_stats) if route_result.needs_review: task.assigned_agent_role = None @@ -326,7 +386,10 @@ def create_task_batch( review_checkpoint = ReviewCheckpointORM( task_id=task.id, reason=route_result.routing_reason, + reason_category="routing_failure", + timeout_policy="fail_closed", review_status="pending", + deadline_at=_review_deadline(), ) db.add(review_checkpoint) db.flush() @@ -343,6 +406,9 @@ def create_task_batch( "task_id": task.id, "review_id": review_checkpoint.id, "reason": route_result.routing_reason, + "reason_category": review_checkpoint.reason_category, + "timeout_policy": review_checkpoint.timeout_policy, + "deadline_at": review_checkpoint.deadline_at.isoformat() if review_checkpoint.deadline_at else None, "source": "router", }, ) @@ -385,6 +451,8 @@ def create_task_batch( return TaskBatchSubmitResponse( batch_id=task_batch.id, + original_task_count=len(payload.tasks), + normalized_task_count=len(normalized_payload.tasks), tasks=[ TaskBatchSubmitTaskRead( task_id=task_mapping[submitted_task.client_task_id].id, @@ -397,7 +465,19 @@ def create_task_batch( auto_execute=routing_results[submitted_task.client_task_id]["auto_execute"], needs_review=routing_results[submitted_task.client_task_id]["needs_review"], ) - for submitted_task in payload.tasks + for submitted_task in normalized_payload.tasks + ], + normalization=[ + TaskNormalizationRead( + client_task_id=item.source_client_task_id, + effective_client_task_id=item.effective_client_task_id, + action=item.action, + is_ambiguous=item.is_ambiguous, + missing_fields_filled=item.missing_fields_filled, + inferred_dependency_client_task_ids=item.inferred_dependency_client_task_ids, + notes=item.notes, + ) + for item in normalization_items ], ) except HTTPException: diff --git a/src/domain/models.py b/src/domain/models.py index 9b3eb4f..fb8c8ea 100644 --- a/src/domain/models.py +++ b/src/domain/models.py @@ -66,6 +66,20 @@ class ReviewStatus(str, Enum): WAIVED = "waived" +class ReviewReasonCategory(str, Enum): + ROUTING_FAILURE = "routing_failure" + SCHEMA_MISMATCH = "schema_mismatch" + TIMEOUT_RISK = "timeout_risk" + MANUAL_OVERRIDE = "manual_override" + OTHER = "other" + + +class ReviewTimeoutPolicy(str, Enum): + FAIL_CLOSED = "fail_closed" + CANCEL_TASK = "cancel_task" + ESCALATE = "escalate" + + class DomainModel(BaseModel): model_config = ConfigDict( extra="forbid", @@ -143,8 +157,11 @@ class ReviewCheckpoint(DomainModel): id: str = Field(default_factory=lambda: _id("review")) task_id: str reason: str + reason_category: ReviewReasonCategory = ReviewReasonCategory.OTHER + timeout_policy: ReviewTimeoutPolicy = ReviewTimeoutPolicy.FAIL_CLOSED review_status: ReviewStatus = ReviewStatus.PENDING reviewer: str | None = None review_comment: str | None = None created_at: datetime = Field(default_factory=datetime.utcnow) + deadline_at: datetime | None = None resolved_at: datetime | None = None diff --git a/src/packages/core/db/models.py b/src/packages/core/db/models.py index a5f677d..3f7b053 100644 --- a/src/packages/core/db/models.py +++ b/src/packages/core/db/models.py @@ -125,10 +125,13 @@ class ReviewCheckpointORM(Base): id: Mapped[str] = mapped_column(String(64), primary_key=True, default=lambda: _id("review")) task_id: Mapped[str] = mapped_column(ForeignKey("tasks.id", ondelete="CASCADE"), nullable=False, index=True) reason: Mapped[str] = mapped_column(Text, nullable=False) + reason_category: Mapped[str] = mapped_column(String(64), nullable=False, default="other", index=True) + timeout_policy: Mapped[str] = mapped_column(String(32), nullable=False, default="fail_closed") review_status: Mapped[str] = mapped_column(String(32), nullable=False, default="pending", index=True) reviewer: Mapped[str | None] = mapped_column(String(255), nullable=True) review_comment: Mapped[str | None] = mapped_column(Text, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_now) + deadline_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) resolved_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) task: Mapped["TaskORM"] = relationship(back_populates="review_checkpoints") diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py index c683a07..bc01759 100644 --- a/src/packages/core/schemas.py +++ b/src/packages/core/schemas.py @@ -8,7 +8,9 @@ from src.domain.models import ( AssignmentStatus, ExecutionRunStatus, + ReviewReasonCategory, ReviewStatus, + ReviewTimeoutPolicy, TaskBatchStatus, TaskPriority, TaskStatus, @@ -68,9 +70,22 @@ class TaskBatchSubmitTaskRead(SchemaModel): needs_review: bool = False +class TaskNormalizationRead(SchemaModel): + client_task_id: str + effective_client_task_id: str + action: str + is_ambiguous: bool = False + missing_fields_filled: list[str] = Field(default_factory=list) + inferred_dependency_client_task_ids: list[str] = Field(default_factory=list) + notes: list[str] = Field(default_factory=list) + + class TaskBatchSubmitResponse(SchemaModel): batch_id: str + original_task_count: int + normalized_task_count: int tasks: list[TaskBatchSubmitTaskRead] + normalization: list[TaskNormalizationRead] = Field(default_factory=list) class BatchCountsRead(SchemaModel): @@ -388,9 +403,12 @@ class RunDetailRead(SchemaModel): class ReviewCheckpointCreate(SchemaModel): task_id: str reason: str + reason_category: ReviewReasonCategory = ReviewReasonCategory.OTHER + timeout_policy: ReviewTimeoutPolicy = ReviewTimeoutPolicy.FAIL_CLOSED review_status: ReviewStatus = ReviewStatus.PENDING reviewer: str | None = None review_comment: str | None = None + deadline_at: datetime | None = None class ReviewCheckpointRead(ReviewCheckpointCreate): @@ -410,6 +428,54 @@ class ReviewDecisionRejectRequest(SchemaModel): review_comment: str = Field(min_length=1) +class ReviewDecisionReassignRequest(SchemaModel): + reviewer: str = Field(min_length=1) + review_comment: str | None = None + agent_role_id: str = Field(min_length=1) + + +class BulkReviewApproveRequest(SchemaModel): + review_ids: list[str] = Field(min_length=1) + reviewer: str = Field(min_length=1) + review_comment: str | None = None + agent_role_id: str = Field(min_length=1) + + +class BulkReviewRejectRequest(SchemaModel): + review_ids: list[str] = Field(min_length=1) + reviewer: str = Field(min_length=1) + review_comment: str = Field(min_length=1) + + +class BulkReviewReassignRequest(SchemaModel): + review_ids: list[str] = Field(min_length=1) + reviewer: str = Field(min_length=1) + review_comment: str | None = None + agent_role_id: str = Field(min_length=1) + + +class BulkReviewItemResult(SchemaModel): + review_id: str + ok: bool + task_id: str | None = None + status: str | None = None + assigned_agent_role: str | None = None + detail: str | None = None + + +class BulkReviewDecisionResponse(SchemaModel): + items: list[BulkReviewItemResult] = Field(default_factory=list) + + +class ReviewTimeoutProcessRequest(SchemaModel): + limit: int = Field(default=100, ge=1, le=500) + + +class ReviewTimeoutProcessResponse(SchemaModel): + processed_count: int = 0 + items: list[BulkReviewItemResult] = Field(default_factory=list) + + class ArtifactCreate(SchemaModel): task_id: str | None = None run_id: str | None = None diff --git a/src/packages/core/task_batch_normalization.py b/src/packages/core/task_batch_normalization.py new file mode 100644 index 0000000..c74a937 --- /dev/null +++ b/src/packages/core/task_batch_normalization.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +from dataclasses import dataclass +import json +import re +from typing import Any + + +STAGE_DEPENDENCY_HINTS = { + "write_summary": {"research", "analyze", "analysis"}, + "implement": {"design", "spec", "plan"}, + "test": {"implement", "build", "code"}, + "review": {"test", "draft", "write", "implement"}, +} + +AMBIGUOUS_TITLE_MARKERS = {"处理一下", "优化", "修复", "看看", "任务"} +AMBIGUOUS_TASK_TYPES = {"", "unknown", "task", "misc", "general"} + + +@dataclass +class NormalizedTask: + source_client_task_id: str + effective_client_task_id: str + title: str + description: str | None + task_type: str + priority: str + input_payload: dict[str, Any] + expected_output_schema: dict[str, Any] + dependency_client_task_ids: list[str] + is_ambiguous: bool + missing_fields_filled: list[str] + inferred_dependency_client_task_ids: list[str] + action: str + notes: list[str] + + +def _normalize_spaces(value: str) -> str: + return re.sub(r"\s+", " ", value.strip()) + + +def _normalized_title(value: str) -> str: + return _normalize_spaces(value).lower() + + +def _string_length_score(value: str | None) -> int: + return len((value or "").strip()) + + +def _task_completeness_score(task: dict[str, Any]) -> tuple[int, int, int]: + description_score = _string_length_score(task.get("description")) + payload_score = len(task.get("input_payload") or {}) + schema_score = len(task.get("expected_output_schema") or {}) + return (description_score, payload_score, schema_score) + + +def _dependency_signature(values: list[str] | None) -> tuple[str, ...]: + return tuple(values or []) + + +def _exact_signature(task: dict[str, Any]) -> str: + return json.dumps( + { + "title": _normalize_spaces(task["title"]), + "task_type": task["task_type"], + "description": _normalize_spaces(task.get("description") or ""), + "input_payload": task.get("input_payload") or {}, + "expected_output_schema": task.get("expected_output_schema") or {}, + "dependency_client_task_ids": task.get("dependency_client_task_ids") or [], + }, + sort_keys=True, + ensure_ascii=False, + ) + + +def _merge_signature(task: dict[str, Any]) -> tuple[str, str]: + return (task["task_type"], _normalized_title(task["title"])) + + +def _should_replace(current_best: dict[str, Any], challenger: dict[str, Any]) -> bool: + return _task_completeness_score(challenger) > _task_completeness_score(current_best) + + +def _fill_defaults(task: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: + filled = dict(task) + missing_fields_filled: list[str] = [] + + description = filled.get("description") + if not description or not str(description).strip(): + filled["description"] = filled["title"] + missing_fields_filled.append("description") + + expected_output_schema = filled.get("expected_output_schema") + if not expected_output_schema: + filled["expected_output_schema"] = {"type": "object"} + missing_fields_filled.append("expected_output_schema") + + priority = filled.get("priority") + if not priority: + filled["priority"] = "medium" + missing_fields_filled.append("priority") + + dependencies = filled.get("dependency_client_task_ids") + if dependencies is None: + filled["dependency_client_task_ids"] = [] + missing_fields_filled.append("dependency_client_task_ids") + + input_payload = filled.get("input_payload") + if input_payload is None: + filled["input_payload"] = {} + missing_fields_filled.append("input_payload") + + return filled, missing_fields_filled + + +def _is_ambiguous(task: dict[str, Any]) -> bool: + title = _normalize_spaces(task["title"]) + if len(title) < 4: + return True + lowered_title = title.lower() + if any(marker in title for marker in AMBIGUOUS_TITLE_MARKERS): + return True + if (task.get("task_type") or "").strip().lower() in AMBIGUOUS_TASK_TYPES: + return True + payload = task.get("input_payload") or {} + if not payload: + return True + if set(payload.keys()) <= {"text", "content"} and not any(str(value).strip() for value in payload.values()): + return True + return False + + +def _infer_dependency( + task: dict[str, Any], + previous_tasks: list[dict[str, Any]], +) -> list[str]: + existing = list(task.get("dependency_client_task_ids") or []) + if existing: + return [] + + task_type = (task.get("task_type") or "").strip().lower() + title = _normalized_title(task["title"]) + hints = STAGE_DEPENDENCY_HINTS.get(task_type, set()) + if not hints: + if "review" in title: + hints = {"draft", "write", "implement"} + elif "test" in title: + hints = {"implement", "build", "code"} + elif "implement" in title: + hints = {"design", "plan", "spec"} + + for previous in reversed(previous_tasks): + previous_type = (previous.get("task_type") or "").strip().lower() + previous_title = _normalized_title(previous["title"]) + if previous_type in hints or any(hint in previous_title for hint in hints): + return [previous["client_task_id"]] + return [] + + +def normalize_batch_tasks( + tasks: list[dict[str, Any]], +) -> tuple[list[dict[str, Any]], list[NormalizedTask]]: + exact_seen: dict[str, dict[str, Any]] = {} + merge_seen: dict[tuple[str, str], dict[str, Any]] = {} + normalization_items: list[NormalizedTask] = [] + normalized_tasks: list[dict[str, Any]] = [] + + for original_task in tasks: + filled_task, missing_fields_filled = _fill_defaults(original_task) + exact_key = _exact_signature(filled_task) + + if exact_key in exact_seen: + kept = exact_seen[exact_key] + normalization_items.append( + NormalizedTask( + source_client_task_id=original_task["client_task_id"], + effective_client_task_id=kept["client_task_id"], + title=filled_task["title"], + description=filled_task["description"], + task_type=filled_task["task_type"], + priority=filled_task["priority"], + input_payload=filled_task["input_payload"], + expected_output_schema=filled_task["expected_output_schema"], + dependency_client_task_ids=filled_task["dependency_client_task_ids"], + is_ambiguous=_is_ambiguous(filled_task), + missing_fields_filled=missing_fields_filled, + inferred_dependency_client_task_ids=[], + action="deduped", + notes=[f"deduped_into={kept['client_task_id']}"], + ) + ) + continue + + merge_key = _merge_signature(filled_task) + if merge_key in merge_seen: + kept = merge_seen[merge_key] + if _dependency_signature(kept.get("dependency_client_task_ids")) == _dependency_signature( + filled_task.get("dependency_client_task_ids") + ): + if _should_replace(kept, filled_task): + kept.update( + { + "title": filled_task["title"], + "description": filled_task["description"], + "priority": filled_task["priority"], + "input_payload": filled_task["input_payload"], + "expected_output_schema": filled_task["expected_output_schema"], + } + ) + normalization_items.append( + NormalizedTask( + source_client_task_id=original_task["client_task_id"], + effective_client_task_id=kept["client_task_id"], + title=filled_task["title"], + description=filled_task["description"], + task_type=filled_task["task_type"], + priority=filled_task["priority"], + input_payload=filled_task["input_payload"], + expected_output_schema=filled_task["expected_output_schema"], + dependency_client_task_ids=filled_task["dependency_client_task_ids"], + is_ambiguous=_is_ambiguous(filled_task), + missing_fields_filled=missing_fields_filled, + inferred_dependency_client_task_ids=[], + action="merged", + notes=[f"merged_into={kept['client_task_id']}"], + ) + ) + continue + + inferred_dependency_client_task_ids = _infer_dependency(filled_task, normalized_tasks) + if inferred_dependency_client_task_ids: + filled_task["dependency_client_task_ids"] = inferred_dependency_client_task_ids + + normalized_tasks.append(filled_task) + exact_seen[_exact_signature(filled_task)] = filled_task + merge_seen[merge_key] = filled_task + notes: list[str] = [] + action = "normalized" if missing_fields_filled or inferred_dependency_client_task_ids else "kept" + if _is_ambiguous(filled_task): + notes.append("task marked as ambiguous") + if action == "kept": + action = "normalized" + if inferred_dependency_client_task_ids: + notes.append("dependency inferred from stage ordering") + if missing_fields_filled: + notes.append("missing fields filled with defaults") + + normalization_items.append( + NormalizedTask( + source_client_task_id=original_task["client_task_id"], + effective_client_task_id=filled_task["client_task_id"], + title=filled_task["title"], + description=filled_task["description"], + task_type=filled_task["task_type"], + priority=filled_task["priority"], + input_payload=filled_task["input_payload"], + expected_output_schema=filled_task["expected_output_schema"], + dependency_client_task_ids=filled_task["dependency_client_task_ids"], + is_ambiguous=_is_ambiguous(filled_task), + missing_fields_filled=missing_fields_filled, + inferred_dependency_client_task_ids=inferred_dependency_client_task_ids, + action=action, + notes=notes, + ) + ) + + return normalized_tasks, normalization_items diff --git a/src/packages/router/__init__.py b/src/packages/router/__init__.py index 14ec8a7..8996502 100644 --- a/src/packages/router/__init__.py +++ b/src/packages/router/__init__.py @@ -1,3 +1,3 @@ -from .rule_router import RouteResult, route_task +from .rule_router import RoleRoutingStats, RouteResult, route_task -__all__ = ["RouteResult", "route_task"] +__all__ = ["RoleRoutingStats", "RouteResult", "route_task"] diff --git a/src/packages/router/rule_router.py b/src/packages/router/rule_router.py index f10a602..fa12562 100644 --- a/src/packages/router/rule_router.py +++ b/src/packages/router/rule_router.py @@ -4,6 +4,8 @@ from src.packages.core.db.models import AgentRoleORM, TaskORM +ROUTING_META_INPUT_KEYS = {"cost_hint", "timeout_seconds"} + @dataclass(frozen=True) class RouteResult: @@ -14,6 +16,31 @@ class RouteResult: needs_review: bool +@dataclass(frozen=True) +class RoleRoutingStats: + total_runs: int = 0 + success_runs: int = 0 + average_latency_ms: float | None = None + average_cost_estimate: float = 0.0 + + @property + def success_rate(self) -> float | None: + if self.total_runs <= 0: + return None + return self.success_runs / self.total_runs + + +@dataclass(frozen=True) +class RoleCandidate: + role: AgentRoleORM + stats: RoleRoutingStats + matched_task_type: bool + matched_capability: bool + schema_compatible: bool + is_default_worker: bool + score: tuple[float, ...] + + def _sorted_roles(agent_roles: list[AgentRoleORM]) -> list[AgentRoleORM]: return sorted( [role for role in agent_roles if role.enabled], @@ -21,20 +48,20 @@ def _sorted_roles(agent_roles: list[AgentRoleORM]) -> list[AgentRoleORM]: ) -def _match_by_task_type(task: TaskORM, agent_roles: list[AgentRoleORM]) -> AgentRoleORM | None: - for role in _sorted_roles(agent_roles): - supported_task_types = role.input_schema.get("supported_task_types", []) - if task.task_type in supported_task_types: - return role - return None +def _supported_task_types(role: AgentRoleORM) -> list[str]: + supported_task_types = role.input_schema.get("supported_task_types", []) + if isinstance(supported_task_types, list): + return [str(item) for item in supported_task_types] + return [] + +def _matches_task_type(task: TaskORM, role: AgentRoleORM) -> bool: + return task.task_type in _supported_task_types(role) -def _match_by_capability(task: TaskORM, agent_roles: list[AgentRoleORM]) -> AgentRoleORM | None: + +def _matches_capability(task: TaskORM, role: AgentRoleORM) -> bool: target_capability = f"task:{task.task_type}" - for role in _sorted_roles(agent_roles): - if target_capability in role.capabilities: - return role - return None + return target_capability in role.capabilities def _schema_compatible(task: TaskORM, role: AgentRoleORM) -> bool: @@ -44,12 +71,14 @@ def _schema_compatible(task: TaskORM, role: AgentRoleORM) -> bool: required_properties = input_requirements.get("properties", {}) expected_output_type = expected_output.get("type") - # Schema compatibility should only apply when the role actually declares a schema contract. if not required_properties and not expected_output_type: return False if required_properties: - if not set(task.input_payload.keys()).issubset(set(required_properties.keys())): + comparable_input_keys = { + key for key in task.input_payload.keys() if key not in ROUTING_META_INPUT_KEYS + } + if not comparable_input_keys.issubset(set(required_properties.keys())): return False task_output_type = task.expected_output_schema.get("type") @@ -59,57 +88,137 @@ def _schema_compatible(task: TaskORM, role: AgentRoleORM) -> bool: return True -def _match_by_schema(task: TaskORM, agent_roles: list[AgentRoleORM]) -> AgentRoleORM | None: - for role in _sorted_roles(agent_roles): - if _schema_compatible(task, role): - return role - return None +def _has_schema_contract(role: AgentRoleORM) -> bool: + input_requirements = role.input_schema.get("input_requirements", {}) + output_contract = role.output_schema.get("output_contract", {}) + return bool(input_requirements.get("properties")) or bool(output_contract.get("type")) + + +def _is_default_worker(role: AgentRoleORM) -> bool: + return role.role_name == "default_worker" or "default_worker" in role.capabilities + + +def _meets_timeout_requirement(task: TaskORM, role: AgentRoleORM) -> bool: + timeout_required = task.input_payload.get("timeout_seconds") + if timeout_required is None: + return True + try: + timeout_required_int = int(timeout_required) + except (TypeError, ValueError): + return True + return role.timeout_seconds >= timeout_required_int + + +def _cost_preference_weight(task: TaskORM, stats: RoleRoutingStats) -> float: + cost_hint = str(task.input_payload.get("cost_hint", "")).strip().lower() + average_cost = float(stats.average_cost_estimate or 0.0) + + if average_cost <= 0: + return 0.0 + if cost_hint == "low": + return -average_cost * 1000 + if cost_hint == "high": + return 0.0 + return -average_cost * 100 + + +def _latency_weight(stats: RoleRoutingStats) -> float: + if stats.average_latency_ms is None: + return 0.0 + return -float(stats.average_latency_ms) / 1000 + + +def _build_routing_reason(candidate: RoleCandidate) -> str: + reasons: list[str] = [] + if candidate.matched_task_type: + reasons.append("task_type") + if candidate.matched_capability: + reasons.append("capability") + if candidate.schema_compatible: + reasons.append("schema") + if candidate.is_default_worker and not reasons: + reasons.append("default_worker") + + success_rate = candidate.stats.success_rate + if success_rate is None: + performance_note = "no_history" + else: + performance_note = f"success_rate={round(success_rate * 100, 2)}%" + + reason_summary = ",".join(reasons) if reasons else "fallback" + return ( + f"capability-ranked route selected role={candidate.role.role_name} " + f"via {reason_summary} ({performance_note})" + ) -def _match_default_role(agent_roles: list[AgentRoleORM]) -> AgentRoleORM | None: - for role in _sorted_roles(agent_roles): - if role.role_name == "default_worker" or "default_worker" in role.capabilities: - return role - return None +def _build_candidate( + task: TaskORM, + role: AgentRoleORM, + stats: RoleRoutingStats, +) -> RoleCandidate | None: + matched_task_type = _matches_task_type(task, role) + matched_capability = _matches_capability(task, role) + schema_compatible = _schema_compatible(task, role) + default_worker = _is_default_worker(role) + + if not any([matched_task_type, matched_capability, schema_compatible, default_worker]): + return None + + if not _meets_timeout_requirement(task, role): + return None + + if _has_schema_contract(role) and not schema_compatible: + return None + + success_rate = stats.success_rate + score = ( + 1.0 if matched_task_type else 0.0, + 1.0 if matched_capability else 0.0, + 1.0 if schema_compatible else 0.0, + 0.0 if default_worker else 1.0, + success_rate if success_rate is not None else 0.5, + _cost_preference_weight(task, stats), + _latency_weight(stats), + float(role.timeout_seconds), + ) + return RoleCandidate( + role=role, + stats=stats, + matched_task_type=matched_task_type, + matched_capability=matched_capability, + schema_compatible=schema_compatible, + is_default_worker=default_worker, + score=score, + ) -def route_task(task: TaskORM, agent_roles: list[AgentRoleORM]) -> RouteResult: - matched_role = _match_by_task_type(task, agent_roles) - if matched_role is not None: - return RouteResult( - agent_role_id=matched_role.id, - agent_role_name=matched_role.role_name, - routing_reason=f"matched by task_type={task.task_type}", - auto_execute=True, - needs_review=False, - ) - matched_role = _match_by_capability(task, agent_roles) - if matched_role is not None: - return RouteResult( - agent_role_id=matched_role.id, - agent_role_name=matched_role.role_name, - routing_reason=f"matched by capability=task:{task.task_type}", - auto_execute=True, - needs_review=False, - ) +def route_task( + task: TaskORM, + agent_roles: list[AgentRoleORM], + role_stats: dict[str, RoleRoutingStats] | None = None, +) -> RouteResult: + role_stats = role_stats or {} + candidates: list[RoleCandidate] = [] - matched_role = _match_by_schema(task, agent_roles) - if matched_role is not None: - return RouteResult( - agent_role_id=matched_role.id, - agent_role_name=matched_role.role_name, - routing_reason="matched by schema compatibility", - auto_execute=True, - needs_review=False, + for role in _sorted_roles(agent_roles): + candidate = _build_candidate( + task, + role, + role_stats.get(role.id, RoleRoutingStats()), ) + if candidate is not None: + candidates.append(candidate) - matched_role = _match_default_role(agent_roles) - if matched_role is not None: + if candidates: + candidates.sort(key=lambda candidate: candidate.role.role_name) + candidates.sort(key=lambda candidate: candidate.score, reverse=True) + selected = candidates[0] return RouteResult( - agent_role_id=matched_role.id, - agent_role_name=matched_role.role_name, - routing_reason="fallback to default_worker", + agent_role_id=selected.role.id, + agent_role_name=selected.role.role_name, + routing_reason=_build_routing_reason(selected), auto_execute=True, needs_review=False, ) diff --git a/src/tests/test_reviews_api.py b/src/tests/test_reviews_api.py index 6ee3692..7f46e6e 100644 --- a/src/tests/test_reviews_api.py +++ b/src/tests/test_reviews_api.py @@ -3,6 +3,7 @@ import os import sys import uuid +from datetime import datetime, timedelta, timezone from pathlib import Path from fastapi.testclient import TestClient @@ -93,12 +94,38 @@ def _batch_payload(suffix: str) -> dict: } +def _review_id_for_task(task_id: str) -> str: + return client.get(f"/tasks/{task_id}/reviews").json()[0]["id"] + + +def _set_review_policy(review_id: str, *, timeout_policy: str, deadline_at: datetime, reason_category: str = "routing_failure") -> None: + engine = create_engine(_database_url()) + with engine.begin() as conn: + conn.execute( + text( + """ + UPDATE review_checkpoints + SET timeout_policy = :timeout_policy, + deadline_at = :deadline_at, + reason_category = :reason_category + WHERE id = :review_id + """ + ), + { + "timeout_policy": timeout_policy, + "deadline_at": deadline_at, + "reason_category": reason_category, + "review_id": review_id, + }, + ) + + _cleanup_database() from src.apps.api.app import app # noqa: E402 from src.apps.worker.executor import run_next_task # noqa: E402 from src.apps.worker.registry import AgentRegistry # noqa: E402 -from src.packages.core.db.models import ExecutionRunORM, TaskORM # noqa: E402 +from src.packages.core.db.models import AssignmentORM, ExecutionRunORM, EventLogORM, ReviewCheckpointORM, TaskORM # noqa: E402 client = TestClient(app) @@ -117,7 +144,7 @@ def teardown_function() -> None: _cleanup_database() -def test_lists_task_reviews_and_review_created_event() -> None: +def test_lists_task_reviews_and_exposes_reason_category_timeout_policy_and_deadline() -> None: suffix = uuid.uuid4().hex[:8] response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 @@ -128,6 +155,9 @@ def test_lists_task_reviews_and_review_created_event() -> None: reviews = reviews_response.json() assert len(reviews) == 1 assert reviews[0]["review_status"] == "pending" + assert reviews[0]["reason_category"] == "routing_failure" + assert reviews[0]["timeout_policy"] == "fail_closed" + assert reviews[0]["deadline_at"] is not None assert "No eligible agent role found" in reviews[0]["reason"] review_id = reviews[0]["id"] @@ -146,9 +176,7 @@ def test_approve_review_assigns_role_and_worker_executes_task() -> None: response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 task_id = response.json()["tasks"][0]["task_id"] - - reviews = client.get(f"/tasks/{task_id}/reviews").json() - review_id = reviews[0]["id"] + review_id = _review_id_for_task(task_id) role_name = f"{TEST_PREFIX}worker-{suffix}" role = _register_agent(client, role_name=role_name, supported_task_types=[role_name]) @@ -183,57 +211,181 @@ def test_approve_review_assigns_role_and_worker_executes_task() -> None: assert "task_review_resolved" in event_types -def test_approve_review_with_unsatisfied_dependency_moves_task_to_blocked() -> None: +def test_reassign_review_supersedes_previous_assignment_and_keeps_task_runnable() -> None: suffix = uuid.uuid4().hex[:8] response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 - dependent_task_id = response.json()["tasks"][1]["task_id"] + task_id = response.json()["tasks"][0]["task_id"] + review_id = _review_id_for_task(task_id) - review_id = client.get(f"/tasks/{dependent_task_id}/reviews").json()[0]["id"] - role = _register_agent( - client, - role_name=f"{TEST_PREFIX}blocked-{suffix}", - supported_task_types=[f"{TEST_PREFIX}blocked-{suffix}"], - ) + first_role = _register_agent(client, role_name=f"{TEST_PREFIX}first-{suffix}", supported_task_types=["dummy"]) + second_role = _register_agent(client, role_name=f"{TEST_PREFIX}second-{suffix}", supported_task_types=["dummy"]) - approve_response = client.post( - f"/reviews/{review_id}/approve", + engine = create_engine(_database_url()) + with Session(engine) as session: + session.add( + AssignmentORM( + task_id=task_id, + agent_role_id=first_role["id"], + routing_reason="stale assignment", + assignment_status="active", + ) + ) + task = session.get(TaskORM, task_id) + assert task is not None + task.assigned_agent_role = first_role["role_name"] + session.commit() + + reassign_response = client.post( + f"/reviews/{review_id}/reassign", json={ "reviewer": "bob", - "review_comment": "approved after manual routing", + "review_comment": "assign to fallback role", + "agent_role_id": second_role["id"], + }, + ) + assert reassign_response.status_code == 200 + assert reassign_response.json()["status"] == "queued" + assert reassign_response.json()["assigned_agent_role"] == second_role["role_name"] + + with Session(engine) as session: + assignments = session.query(AssignmentORM).filter(AssignmentORM.task_id == task_id).order_by(AssignmentORM.assigned_at.asc()).all() + assert len(assignments) == 2 + assert assignments[0].assignment_status == "superseded" + assert assignments[1].assignment_status == "active" + + events = client.get(f"/tasks/{task_id}/events").json() + assert "review_reassigned" in [event["event_type"] for event in events] + + +def test_bulk_approve_reviews_returns_per_item_results() -> None: + suffix = uuid.uuid4().hex[:8] + response = client.post("/task-batches", json=_batch_payload(suffix)) + assert response.status_code == 201 + task_ids = [item["task_id"] for item in response.json()["tasks"]] + review_ids = [_review_id_for_task(task_id) for task_id in task_ids] + role = _register_agent(client, role_name=f"{TEST_PREFIX}bulk-{suffix}", supported_task_types=["dummy"]) + + bulk_response = client.post( + "/reviews/bulk/approve", + json={ + "review_ids": [review_ids[0], "review_missing", review_ids[2]], + "reviewer": "alice", + "review_comment": "bulk approval", "agent_role_id": role["id"], }, ) - assert approve_response.status_code == 200 - assert approve_response.json()["status"] == "blocked" + assert bulk_response.status_code == 200 + items = bulk_response.json()["items"] + assert len(items) == 3 + assert items[0]["ok"] is True + assert items[0]["status"] == "queued" + assert items[1]["ok"] is False + assert "not found" in items[1]["detail"].lower() + assert items[2]["ok"] is True -def test_reject_review_marks_task_failed_and_prevents_execution() -> None: +def test_bulk_reject_handles_already_resolved_review_without_rolling_back_others() -> None: suffix = uuid.uuid4().hex[:8] response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 - task_id = response.json()["tasks"][0]["task_id"] + task_ids = [item["task_id"] for item in response.json()["tasks"]] + review_ids = [_review_id_for_task(task_id) for task_id in task_ids] + role = _register_agent(client, role_name=f"{TEST_PREFIX}once-{suffix}", supported_task_types=["dummy"]) - review_id = client.get(f"/tasks/{task_id}/reviews").json()[0]["id"] - reject_response = client.post( - f"/reviews/{review_id}/reject", + first_approve = client.post( + f"/reviews/{review_ids[0]}/approve", + json={"reviewer": "alice", "review_comment": "approve one", "agent_role_id": role["id"]}, + ) + assert first_approve.status_code == 200 + + bulk_reject = client.post( + "/reviews/bulk/reject", json={ + "review_ids": review_ids, "reviewer": "alice", - "review_comment": "insufficient confidence", + "review_comment": "bulk reject", }, ) - assert reject_response.status_code == 200 - assert reject_response.json()["status"] == "failed" + assert bulk_reject.status_code == 200 + items = bulk_reject.json()["items"] + assert len(items) == 3 + assert items[0]["ok"] is False + assert "cannot be decided" in items[0]["detail"] + assert items[1]["ok"] is True + assert items[1]["status"] == "failed" + assert items[2]["ok"] is True - engine = create_engine(_database_url()) - with Session(engine) as session: - assert run_next_task(session, AgentRegistry()) is None - assert session.query(ExecutionRunORM).filter(ExecutionRunORM.task_id == task_id).count() == 0 - events_response = client.get(f"/tasks/{task_id}/events") - event_types = [event["event_type"] for event in events_response.json()] - assert "review_rejected" in event_types - assert "task_review_resolved" in event_types +def test_process_timeouts_fail_closed_marks_task_failed() -> None: + suffix = uuid.uuid4().hex[:8] + response = client.post("/task-batches", json=_batch_payload(suffix)) + assert response.status_code == 201 + task_id = response.json()["tasks"][0]["task_id"] + review_id = _review_id_for_task(task_id) + _set_review_policy( + review_id, + timeout_policy="fail_closed", + deadline_at=datetime.now(timezone.utc) - timedelta(minutes=1), + ) + + timeout_response = client.post("/reviews/process-timeouts", json={"limit": 10}) + assert timeout_response.status_code == 200 + assert timeout_response.json()["processed_count"] == 1 + + task_response = client.get(f"/tasks/{task_id}") + assert task_response.status_code == 200 + assert task_response.json()["status"] == "failed" + + +def test_process_timeouts_cancel_task_marks_task_cancelled() -> None: + suffix = uuid.uuid4().hex[:8] + response = client.post("/task-batches", json=_batch_payload(suffix)) + assert response.status_code == 201 + task_id = response.json()["tasks"][0]["task_id"] + review_id = _review_id_for_task(task_id) + _set_review_policy( + review_id, + timeout_policy="cancel_task", + deadline_at=datetime.now(timezone.utc) - timedelta(minutes=1), + ) + + timeout_response = client.post("/reviews/process-timeouts", json={"limit": 10}) + assert timeout_response.status_code == 200 + assert timeout_response.json()["processed_count"] == 1 + + task_response = client.get(f"/tasks/{task_id}") + assert task_response.status_code == 200 + assert task_response.json()["status"] == "cancelled" + + +def test_process_timeouts_escalate_creates_new_pending_review_and_is_idempotent() -> None: + suffix = uuid.uuid4().hex[:8] + response = client.post("/task-batches", json=_batch_payload(suffix)) + assert response.status_code == 201 + task_id = response.json()["tasks"][0]["task_id"] + review_id = _review_id_for_task(task_id) + _set_review_policy( + review_id, + timeout_policy="escalate", + deadline_at=datetime.now(timezone.utc) - timedelta(minutes=1), + ) + + first_timeout = client.post("/reviews/process-timeouts", json={"limit": 10}) + assert first_timeout.status_code == 200 + assert first_timeout.json()["processed_count"] == 1 + + reviews = client.get(f"/tasks/{task_id}/reviews").json() + assert len(reviews) == 2 + assert reviews[0]["review_status"] == "rejected" + assert reviews[1]["review_status"] == "pending" + assert reviews[1]["reason_category"] == "manual_override" + + second_timeout = client.post("/reviews/process-timeouts", json={"limit": 10}) + assert second_timeout.status_code == 200 + assert second_timeout.json()["processed_count"] == 0 + reviews_after = client.get(f"/tasks/{task_id}/reviews").json() + assert len(reviews_after) == 2 def test_cannot_decide_review_after_task_is_cancelled() -> None: @@ -241,7 +393,7 @@ def test_cannot_decide_review_after_task_is_cancelled() -> None: response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 task_id = response.json()["tasks"][0]["task_id"] - review_id = client.get(f"/tasks/{task_id}/reviews").json()[0]["id"] + review_id = _review_id_for_task(task_id) cancel_response = client.post(f"/tasks/{task_id}/cancel", json={"reason": "user stop"}) assert cancel_response.status_code == 200 @@ -277,7 +429,7 @@ def test_cannot_approve_review_twice() -> None: response = client.post("/task-batches", json=_batch_payload(suffix)) assert response.status_code == 201 task_id = response.json()["tasks"][0]["task_id"] - review_id = client.get(f"/tasks/{task_id}/reviews").json()[0]["id"] + review_id = _review_id_for_task(task_id) role = _register_agent( client, role_name=f"{TEST_PREFIX}repeat-{suffix}", diff --git a/src/tests/test_task_batch_normalization_api.py b/src/tests/test_task_batch_normalization_api.py new file mode 100644 index 0000000..4cc93d4 --- /dev/null +++ b/src/tests/test_task_batch_normalization_api.py @@ -0,0 +1,207 @@ +import os +import sys +import uuid +from pathlib import Path + +from fastapi.testclient import TestClient +from sqlalchemy import create_engine, text + + +ROOT = Path(__file__).resolve().parents[2] +TEST_TITLE_PREFIX = "normalization-test-batch-" + +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +def _database_url() -> str: + database_url = os.getenv("DATABASE_URL") + if not database_url: + env_file = ROOT / ".env" + if env_file.exists(): + for line in env_file.read_text(encoding="utf-8").splitlines(): + if line.startswith("DATABASE_URL="): + database_url = line.split("=", 1)[1].strip() + break + if not database_url: + raise RuntimeError("DATABASE_URL is not set") + return database_url + + +def _cleanup_database() -> None: + engine = create_engine(_database_url()) + with engine.begin() as conn: + conn.execute(text("DELETE FROM task_batches")) + conn.execute(text("DELETE FROM agent_roles")) + + +_cleanup_database() + +from src.apps.api.app import app # noqa: E402 + + +client = TestClient(app) + + +def setup_function() -> None: + _cleanup_database() + + +def teardown_function() -> None: + _cleanup_database() + + +def _base_payload(tasks: list[dict]) -> dict: + suffix = uuid.uuid4().hex[:8] + return { + "title": f"{TEST_TITLE_PREFIX}{suffix}", + "description": "normalization batch", + "created_by": "pytest", + "metadata": {"suite": "normalization"}, + "tasks": tasks, + } + + +def test_submit_returns_normalization_metadata_for_exact_deduplication() -> None: + payload = _base_payload( + [ + { + "client_task_id": "task_1", + "title": "Research topic", + "description": "Research topic", + "task_type": "research", + "input_payload": {"topic": "ai"}, + }, + { + "client_task_id": "task_2", + "title": "Research topic", + "description": "Research topic", + "task_type": "research", + "input_payload": {"topic": "ai"}, + }, + { + "client_task_id": "task_3", + "title": "Write summary", + "task_type": "write_summary", + "input_payload": {"topic": "ai"}, + }, + ] + ) + + response = client.post("/task-batches", json=payload) + assert response.status_code == 201 + body = response.json() + assert body["original_task_count"] == 3 + assert body["normalized_task_count"] == 2 + assert len(body["tasks"]) == 2 + + deduped = next(item for item in body["normalization"] if item["client_task_id"] == "task_2") + assert deduped["action"] == "deduped" + assert deduped["effective_client_task_id"] == "task_1" + + +def test_submit_merges_similar_tasks_and_keeps_more_complete_item() -> None: + payload = _base_payload( + [ + { + "client_task_id": "task_1", + "title": "Implement API", + "description": "short", + "task_type": "implement", + "input_payload": {"service": "billing"}, + }, + { + "client_task_id": "task_2", + "title": " implement api ", + "description": "implement the billing API with retry and metrics", + "task_type": "implement", + "input_payload": {"service": "billing", "retry": True}, + }, + { + "client_task_id": "task_3", + "title": "Test API", + "task_type": "test", + "input_payload": {"service": "billing"}, + }, + ] + ) + + response = client.post("/task-batches", json=payload) + assert response.status_code == 201 + body = response.json() + assert body["normalized_task_count"] == 2 + merged = next(item for item in body["normalization"] if item["client_task_id"] == "task_2") + assert merged["action"] == "merged" + assert merged["effective_client_task_id"] == "task_1" + + +def test_submit_fills_missing_fields_marks_ambiguous_and_infers_dependency() -> None: + payload = _base_payload( + [ + { + "client_task_id": "task_1", + "title": "研究方案", + "task_type": "research", + "input_payload": {"topic": "ai"}, + }, + { + "client_task_id": "task_2", + "title": "处理一下", + "task_type": "unknown", + "input_payload": {}, + }, + { + "client_task_id": "task_3", + "title": "Write summary", + "task_type": "write_summary", + "input_payload": {"topic": "ai"}, + }, + ] + ) + + response = client.post("/task-batches", json=payload) + assert response.status_code == 201 + body = response.json() + + ambiguous = next(item for item in body["normalization"] if item["client_task_id"] == "task_2") + assert ambiguous["is_ambiguous"] is True + assert "description" in ambiguous["missing_fields_filled"] + assert "expected_output_schema" in ambiguous["missing_fields_filled"] + + summary_task = next(item for item in body["normalization"] if item["client_task_id"] == "task_3") + assert summary_task["inferred_dependency_client_task_ids"] == ["task_1"] + + created_summary = next(item for item in body["tasks"] if item["client_task_id"] == "task_3") + assert len(created_summary["dependency_ids"]) == 1 + + +def test_submit_normalization_does_not_create_cycle_from_inferred_dependency() -> None: + payload = _base_payload( + [ + { + "client_task_id": "task_1", + "title": "Implement service", + "task_type": "implement", + "input_payload": {"service": "x"}, + }, + { + "client_task_id": "task_2", + "title": "Test service", + "task_type": "test", + "input_payload": {"service": "x"}, + }, + { + "client_task_id": "task_3", + "title": "Review service", + "task_type": "review", + "input_payload": {"service": "x"}, + }, + ] + ) + + response = client.post("/task-batches", json=payload) + assert response.status_code == 201 + body = response.json() + assert body["normalized_task_count"] == 3 + review_item = next(item for item in body["normalization"] if item["client_task_id"] == "task_3") + assert review_item["inferred_dependency_client_task_ids"] == ["task_2"] diff --git a/src/tests/test_task_routing.py b/src/tests/test_task_routing.py index dc01ab0..71e4ed9 100644 --- a/src/tests/test_task_routing.py +++ b/src/tests/test_task_routing.py @@ -7,6 +7,9 @@ from fastapi.testclient import TestClient from sqlalchemy import create_engine, text +from sqlalchemy.orm import Session + +from src.packages.core.db.models import ExecutionRunORM ROOT = Path(__file__).resolve().parents[2] @@ -37,7 +40,13 @@ def _cleanup_database() -> None: conn.execute(text("DELETE FROM agent_roles")) -def _batch_payload(task_type: str, suffix: str) -> dict: +def _batch_payload( + task_type: str, + suffix: str, + *, + input_payload: dict | None = None, + expected_output_schema: dict | None = None, +) -> dict: return { "title": f"{ROUTING_PREFIX}batch-{suffix}", "description": "routing batch", @@ -49,8 +58,8 @@ def _batch_payload(task_type: str, suffix: str) -> dict: "title": f"{ROUTING_PREFIX}task-{suffix}-1", "task_type": task_type, "priority": "medium", - "input_payload": {"text": "hello"}, - "expected_output_schema": {"type": "object"}, + "input_payload": input_payload or {"text": "hello"}, + "expected_output_schema": expected_output_schema or {"type": "object"}, "dependency_client_task_ids": [], }, { @@ -58,8 +67,8 @@ def _batch_payload(task_type: str, suffix: str) -> dict: "title": f"{ROUTING_PREFIX}task-{suffix}-2", "task_type": task_type, "priority": "medium", - "input_payload": {"text": "world"}, - "expected_output_schema": {"type": "object"}, + "input_payload": input_payload or {"text": "world"}, + "expected_output_schema": expected_output_schema or {"type": "object"}, "dependency_client_task_ids": [], }, { @@ -67,8 +76,8 @@ def _batch_payload(task_type: str, suffix: str) -> dict: "title": f"{ROUTING_PREFIX}task-{suffix}-3", "task_type": task_type, "priority": "medium", - "input_payload": {"text": "!"}, - "expected_output_schema": {"type": "object"}, + "input_payload": input_payload or {"text": "!"}, + "expected_output_schema": expected_output_schema or {"type": "object"}, "dependency_client_task_ids": [], }, ], @@ -84,6 +93,7 @@ def _register_agent( input_properties: dict | None = None, output_type: str = "object", declare_schema: bool = True, + timeout_seconds: int = 300, ) -> dict: input_requirements = {"properties": input_properties or {"text": {"type": "string"}}} output_contract = {"type": output_type} @@ -105,7 +115,7 @@ def _register_agent( }, "input_schema": {}, "output_schema": {}, - "timeout_seconds": 300, + "timeout_seconds": timeout_seconds, "max_retries": 1, "enabled": True, "version": "1.0.0", @@ -120,6 +130,112 @@ def _register_agent( return response.json() +def _seed_history(role_id: str, role_name: str, *, run_statuses: list[str], suffix: str, prompt_tokens: int = 10, completion_tokens: int = 5, latency_ms: int = 100) -> None: + batch_id = f"batch_{uuid.uuid4().hex}" + task_id = f"task_{uuid.uuid4().hex}" + engine = create_engine(_database_url()) + with Session(engine) as session: + session.execute( + text( + """ + INSERT INTO task_batches ( + id, + title, + description, + created_by, + created_at, + status, + total_tasks, + metadata + ) VALUES ( + :id, + :title, + :description, + 'pytest', + NOW(), + 'submitted', + 1, + '{}'::jsonb + ) + """ + ), + { + "id": batch_id, + "title": f"{ROUTING_PREFIX}history-batch-{suffix}-{role_name}", + "description": "routing seeded batch", + }, + ) + session.execute( + text( + """ + INSERT INTO tasks ( + id, + batch_id, + title, + description, + task_type, + priority, + status, + input_payload, + expected_output_schema, + assigned_agent_role, + dependency_ids, + retry_count, + cancellation_requested, + cancellation_requested_at, + cancellation_reason, + created_at, + updated_at + ) VALUES ( + :id, + :batch_id, + :title, + :description, + 'generate', + 'medium', + 'success', + '{}'::jsonb, + '{}'::jsonb, + :assigned_agent_role, + ARRAY[]::varchar[], + 0, + FALSE, + NULL, + NULL, + NOW(), + NOW() + ) + """ + ), + { + "id": task_id, + "batch_id": batch_id, + "title": f"{ROUTING_PREFIX}history-task-{suffix}-{role_name}", + "description": "routing seeded task", + "assigned_agent_role": role_name, + }, + ) + for index, run_status in enumerate(run_statuses, start=1): + session.add( + ExecutionRunORM( + id=f"run_{uuid.uuid4().hex}", + task_id=task_id, + agent_role_id=role_id, + run_status=run_status, + logs=[], + input_snapshot={}, + output_snapshot={}, + token_usage={ + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + }, + latency_ms=latency_ms * index, + ) + ) + session.commit() + + _cleanup_database() from src.apps.api.app import app # noqa: E402 @@ -152,7 +268,8 @@ def test_routes_tasks_by_exact_task_type() -> None: assert response.status_code == 201 tasks = response.json()["tasks"] assert all(task["assigned_agent_role"] == role["role_name"] for task in tasks) - assert all(task["routing_reason"] == "matched by task_type=generate" for task in tasks) + assert all("capability-ranked route selected role=" in task["routing_reason"] for task in tasks) + assert all("via task_type,capability,schema" in task["routing_reason"] for task in tasks) assert all(task["status"] == "queued" for task in tasks) @@ -170,23 +287,129 @@ def test_routes_tasks_by_capability_when_task_type_not_declared() -> None: assert response.status_code == 201 tasks = response.json()["tasks"] assert all(task["assigned_agent_role"] == role["role_name"] for task in tasks) - assert all(task["routing_reason"] == "matched by capability=task:write_summary" for task in tasks) + assert all("via capability,schema" in task["routing_reason"] for task in tasks) def test_routes_builtin_search_and_code_roles_by_capability() -> None: suffix = uuid.uuid4().hex[:8] - search_response = client.post("/task-batches", json=_batch_payload("research_topic", f"{suffix}-search")) + search_response = client.post( + "/task-batches", + json=_batch_payload("research_topic", f"{suffix}-search", input_payload={"query": "topic"}), + ) assert search_response.status_code == 201 search_tasks = search_response.json()["tasks"] assert all(task["assigned_agent_role"] == "search_agent" for task in search_tasks) - assert all(task["routing_reason"] == "matched by capability=task:research_topic" for task in search_tasks) + assert all("via capability,schema" in task["routing_reason"] for task in search_tasks) - code_response = client.post("/task-batches", json=_batch_payload("implement_feature", f"{suffix}-code")) + code_response = client.post("/task-batches", json=_batch_payload("implement_feature", f"{suffix}-code", input_payload={"prompt": "do it", "language": "python"})) assert code_response.status_code == 201 code_tasks = code_response.json()["tasks"] assert all(task["assigned_agent_role"] == "code_agent" for task in code_tasks) - assert all(task["routing_reason"] == "matched by capability=task:implement_feature" for task in code_tasks) + assert all("via capability,schema" in task["routing_reason"] for task in code_tasks) + + +def test_prefers_higher_success_rate_when_multiple_roles_match_same_task() -> None: + suffix = uuid.uuid4().hex[:8] + stable_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}stable-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + ) + flaky_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}flaky-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + ) + _seed_history(stable_role["id"], stable_role["role_name"], run_statuses=["success", "success", "failed"], suffix=suffix) + _seed_history(flaky_role["id"], flaky_role["role_name"], run_statuses=["success", "failed", "failed"], suffix=suffix) + + response = client.post("/task-batches", json=_batch_payload("generate", suffix)) + + assert response.status_code == 201 + tasks = response.json()["tasks"] + assert all(task["assigned_agent_role"] == stable_role["role_name"] for task in tasks) + assert all("success_rate=66.67%" in task["routing_reason"] for task in tasks) + + +def test_prefers_lower_cost_for_low_cost_hint_when_roles_otherwise_match() -> None: + suffix = uuid.uuid4().hex[:8] + cheap_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}cheap-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + ) + costly_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}costly-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + ) + _seed_history(cheap_role["id"], cheap_role["role_name"], run_statuses=["success"], suffix=suffix, prompt_tokens=10, completion_tokens=5) + _seed_history(costly_role["id"], costly_role["role_name"], run_statuses=["success"], suffix=suffix, prompt_tokens=500, completion_tokens=500) + + response = client.post( + "/task-batches", + json=_batch_payload("generate", suffix, input_payload={"text": "hello", "cost_hint": "low"}), + ) + + assert response.status_code == 201 + tasks = response.json()["tasks"] + assert all(task["assigned_agent_role"] == cheap_role["role_name"] for task in tasks) + + +def test_filters_roles_that_cannot_meet_timeout_requirement() -> None: + suffix = uuid.uuid4().hex[:8] + short_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}short-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + timeout_seconds=30, + ) + long_role = _register_agent( + client, + role_name=f"{ROUTING_PREFIX}long-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + timeout_seconds=600, + ) + + response = client.post( + "/task-batches", + json=_batch_payload("generate", suffix, input_payload={"text": "hello", "timeout_seconds": 120}), + ) + + assert response.status_code == 201 + tasks = response.json()["tasks"] + assert all(task["assigned_agent_role"] == long_role["role_name"] for task in tasks) + assert all(task["assigned_agent_role"] != short_role["role_name"] for task in tasks) + + +def test_marks_tasks_waiting_review_when_all_candidates_filtered_out() -> None: + suffix = uuid.uuid4().hex[:8] + _register_agent( + client, + role_name=f"{ROUTING_PREFIX}short-{suffix}", + capabilities=["task:generate"], + supported_task_types=["generate"], + timeout_seconds=30, + ) + + response = client.post( + "/task-batches", + json=_batch_payload("generate", suffix, input_payload={"text": "hello", "timeout_seconds": 120}), + ) + + assert response.status_code == 201 + tasks = response.json()["tasks"] + assert all(task["assigned_agent_role"] is None for task in tasks) + assert all(task["needs_review"] is True for task in tasks) + assert all(task["status"] == "needs_review" for task in tasks) + assert all(task["routing_reason"] == "No eligible agent role found for task_type=generate" for task in tasks) def test_routes_tasks_to_default_worker_as_fallback() -> None: @@ -204,7 +427,7 @@ def test_routes_tasks_to_default_worker_as_fallback() -> None: assert response.status_code == 201 tasks = response.json()["tasks"] assert all(task["assigned_agent_role"] == role["role_name"] for task in tasks) - assert all(task["routing_reason"] == "fallback to default_worker" for task in tasks) + assert all("via default_worker" in task["routing_reason"] for task in tasks) def test_marks_tasks_waiting_review_when_no_role_matches() -> None: