Main workflow orchestrator combining DSPy and agent-framework.
SupervisorWorkflow(
context: SupervisorContext,
workflow_runner: Optional[Workflow] = None,
dspy_supervisor: Optional[Any] = None,
**kwargs
)Parameters:
context: Required SupervisorContext instance containing config, agents, and tools.workflow_runner: Optional Agent Framework Workflow instance.
Attributes:
context: SupervisorContext instanceconfig: WorkflowConfig instancedspy_reasoner: DSPyReasoner instanceagents: Dictionary of ChatAgent instancestool_registry: ToolRegistry instancehistory_manager: HistoryManager instance
Execute workflow for a given task (non-streaming).
Parameters:
task: Task description string
Returns:
{
"result": str, # Final execution result
"routing": { # Routing decision
"mode": str, # "parallel", "sequential", or "delegated"
"assigned_to": List[str],
"subtasks": List[str],
},
"quality": { # Quality assessment
"score": float, # 0-10 scale
"missing": str, # Missing elements
"improvements": str # Suggested improvements
},
"execution_summary": Dict[str, Any]
}Raises:
ValueError: If task is empty or too longAgentExecutionError: If agent execution failsRoutingError: If routing failsHistoryError: If history save fails
Example:
from agentic_fleet.workflows.supervisor_workflow import create_supervisor_workflow
workflow = await create_supervisor_workflow(compile_dspy=True)
result = await workflow.run("Analyze the impact of AI")
print(f"Result: {result['result']}")
print(f"Quality: {result['quality']['score']}/10")Execute workflow with streaming events.
Parameters:
task: Task description string
Yields:
MagenticAgentMessageEvent: Agent messages during executionWorkflowOutputEvent: Final result with complete data
Example:
from agentic_fleet.workflows.supervisor_workflow import create_supervisor_workflow
workflow = await create_supervisor_workflow(compile_dspy=True)
async for event in workflow.run_stream("Your task"):
if hasattr(event, 'agent_id'):
print(f"{event.agent_id}: {event.message.text}")
elif hasattr(event, 'data'):
print(f"Final result: {event.data['result']}")Configuration dataclass for workflow execution.
Attributes:
max_rounds: int = 15 # Max agent conversation turns
max_stalls: int = 3 # Max stuck iterations
max_resets: int = 2 # Max workflow resets
enable_streaming: bool = True # Stream events
parallel_threshold: int = 3 # Min agents for parallel
dspy_model: str = "gpt-4.1" # DSPy model
compile_dspy: bool = True # Enable compilation
refinement_threshold: float = 8.0 # Quality threshold
enable_refinement: bool = True # Auto-refine
enable_completion_storage: bool = False # OpenAI storage
agent_models: Optional[Dict[str, str]] # Per-agent models
agent_temperatures: Optional[Dict[str, float]] # Per-agent temps
history_format: str = "jsonl" # "jsonl" or "json"Example:
from agentic_fleet.workflows.config import WorkflowConfig
config = WorkflowConfig(
dspy_model="gpt-5-mini",
refinement_threshold=9.0,
max_rounds=20,
)DSPy module for intelligent task analysis and routing.
Analyze task complexity and requirements.
Parameters:
task: Task to analyzeuse_tools: Whether to use tools during analysis
Returns:
{
"complexity": str, # "simple", "moderate", "complex"
"capabilities": List[str], # Required capabilities
"steps": int, # Estimated steps
"tool_requirements": List[str], # Required tools
"needs_web_search": bool # Whether web search needed
}Route task to appropriate agents.
Parameters:
task: Task to routeteam: Dictionary of agent names to descriptions
Returns:
{
"assigned_to": List[str], # Agent names
"mode": str, # Execution mode
"subtasks": List[str], # Subtasks (if parallel)
"tool_requirements": List[str]
}Assess quality of execution results.
Parameters:
requirements: Original task requirementsresults: Execution results
Returns:
{
"score": float, # 0-10 scale
"missing": str, # Missing elements
"improvements": str # Suggested improvements
}Centralized registry for tool metadata and capabilities.
Register a tool with metadata.
Parameters:
name: Tool nametool: Tool instanceagent: Agent name that has the toolcapabilities: List of capability tags (optional)use_cases: List of use case descriptions (optional)
Get formatted tool descriptions for DSPy prompts.
Parameters:
agent_filter: Return only tools for this agent (optional)
Returns: Formatted string of tool descriptions
Get all tools available to a specific agent.
Find tools by capability tag.
Example:
from agentic_fleet.utils.tool_registry import ToolRegistry
registry = ToolRegistry()
# Get all tools with web_search capability
search_tools = registry.get_tools_by_capability("web_search")Manages execution history storage and retrieval.
HistoryManager(
history_format: str = "jsonl",
max_entries: Optional[int] = None
)Parameters:
history_format: "jsonl" or "json"max_entries: Maximum entries to keep (auto-rotation)
Save execution to history file.
Returns: Path to history file
Raises: HistoryError if save fails
Load execution history.
Parameters:
limit: Maximum entries to return (None for all)
Returns: List of execution dictionaries
Get statistics about execution history.
Returns:
{
"total_executions": int,
"total_time_seconds": float,
"average_time_seconds": float,
"average_quality_score": float,
"format": str
}Clear execution history.
Parameters:
keep_recent: Number of recent entries to keep (0 to clear all)
Base exception for all workflow errors.
Raised when agent execution fails.
Attributes:
agent_name: Name of failed agenttask: Task that failedoriginal_error: Original exception
Raised when routing fails or produces invalid results.
Attributes:
routing_decision: The invalid routing decision (if available)
Raised when configuration is invalid.
Attributes:
config_key: Configuration key that failed (if available)
Raised when history operations fail.
Attributes:
history_file: Path to history file (if available)
Factory function to create and initialize workflow.
Parameters:
compile_dspy: Whether to compile DSPy module
Returns: Initialized SupervisorWorkflow instance
Example:
from agentic_fleet.workflows.supervisor_workflow import create_supervisor_workflow
# With compilation
workflow = await create_supervisor_workflow(compile_dspy=True)
# Skip compilation for faster startup
workflow = await create_supervisor_workflow(compile_dspy=False)Load configuration from YAML file.
Parameters:
config_path: Path to config file (defaults to src/agentic_fleet/config/workflow_config.yaml)
Returns: Configuration dictionary
Example:
from agentic_fleet.core.config import load_config
config = load_config()Validate configuration using Pydantic.
Parameters:
config_dict: Configuration dictionary to validate
Returns: Validated WorkflowConfigSchema instance
Raises: ConfigurationError if invalid
Example:
from agentic_fleet.core.config import validate_config
from agentic_fleet.workflows.exceptions import ConfigurationError
try:
validated = validate_config(config_dict)
except ConfigurationError as e:
print(f"Invalid config: {e}")Compile DSPy supervisor module with training examples.
Parameters:
module: DSPy module to compileexamples_path: Path to training examples JSONuse_cache: Whether to use/save cache
Returns: Compiled DSPy module
Example:
from agentic_fleet.utils.compiler import compile_supervisor
from agentic_fleet.dspy_modules.reasoner import DSPyReasoner
# ...
supervisor = DSPyReasoner()
compiled = compile_supervisor(
supervisor,
examples_path="data/supervisor_examples.json",
use_cache=True
)Clear compiled module cache.
Example:
from agentic_fleet.utils.compiler import clear_cache
clear_cache()Get cache metadata and statistics.
Example:
from agentic_fleet.utils.compiler import get_cache_info
info = get_cache_info()
if info:
print(f"Cache size: {info['cache_size_bytes']} bytes")Returns:
{
"cache_path": str,
"cache_size_bytes": int,
"cache_mtime": str,
"version": int,
"examples_path": str,
"created_at": str
}Key type definitions used throughout:
from typing import Dict, Any, List, Optional, AsyncIterator
from agentic_fleet.utils.models import ExecutionMode, RoutingDecision
AgentDict = Dict[str, ChatAgent]
QualityAssessment = Dict[str, Any]
ExecutionResult = Dict[str, Any]Note: RoutingDecision and ExecutionMode are defined in src.agentic_fleet.utils.models.
Agent message during execution.
Attributes:
agent_id: Agent identifiermessage: EventMessage with text content
Final workflow result.
Attributes:
data: Dictionary with result, routing, quality, execution_summarysource_executor_id: Executor that generated the event
from agentic_fleet.utils.compiler import CACHE_VERSION
# CACHE_VERSION = 1 # Current cache version for invalidationThe Agentic Fleet exposes a FastAPI-based REST API for interacting with the system.
Enhanced health check with dependency verification (v0.6.7+).
Response:
{
"status": "ok",
"checks": {
"api": "ok",
"workflow": "ok",
"session_manager": "ok",
"conversations": "ok"
},
"version": "<package version>"
}Status Values:
ok: All dependencies healthydegraded: Some dependencies unavailable
Readiness check for load balancers.
Response:
{
"status": "ready",
"workflow": true
}Base URL: /api/v1
Execute a workflow for a given task.
Request Body: WorkflowRequest
{
"task": "Your task description",
"config": {
"dspy_model": "gpt-4.1",
"max_rounds": 10
}
}Response: WorkflowResponse
Trigger a self-improvement/optimization run (background task).
Request Body: OptimizationRequest
{
"iterations": 3,
"task": "Benchmark task",
"compile_dspy": true
}Response: OptimizationResult (pending status)
Check status of an optimization run.
Response: OptimizationResult
Retrieve execution history.
Query Parameters:
limit: Number of entries (default: 20)min_quality: Minimum quality score filter (default: 0.0)
Response: HistoryResponse
Trigger self-improvement from history.
Request Body: SelfImprovementRequest
{
"min_quality": 8.0,
"max_examples": 20
}Response: SelfImprovementResponse
List available agents and their capabilities.
Response: AgentListResponse