Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 106 additions & 10 deletions autobot-backend/orchestration/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

Issue #381: Extracted from enhanced_orchestrator.py god class refactoring.
Contains workflow execution, step coordination, and agent interaction handling.

Issue #2168: Added circuit breaker + retry decorators to step execution.
Issue #2172: Added parallel execution for independent workflow steps.
"""

import asyncio
Expand All @@ -15,7 +18,13 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional

from constants.threshold_constants import TimingConstants
from circuit_breaker import circuit_breaker_async
from constants.threshold_constants import (
CircuitBreakerDefaults,
RetryConfig,
TimingConstants,
)
from retry_mechanism import RetryStrategy, retry_async

from .types import AgentInteraction, AgentProfile

Expand Down Expand Up @@ -57,6 +66,52 @@ def __init__(
self._release_agent = release_agent_callback
self._update_performance = update_performance_callback

def _group_steps_by_dependency(
self, steps: List[Dict[str, Any]]
) -> List[List[Dict[str, Any]]]:
"""
Group workflow steps for parallel execution.

Steps with no unmet dependencies form a parallel group. Each group
runs concurrently; groups are ordered sequentially. Mirrors the
DependencyAnalyzer.get_parallel_groups pattern from
tools/parallel/analyzer.py. Issue #2172.

Args:
steps: List of workflow steps with optional 'dependencies' field

Returns:
Ordered list of groups; steps in each group run concurrently
"""
step_map = {s["id"]: s for s in steps}
remaining = [s["id"] for s in steps]
completed: set = set()
groups: List[List[Dict[str, Any]]] = []

while remaining:
ready_ids = [
sid
for sid in remaining
if all(
dep in completed for dep in step_map[sid].get("dependencies", [])
)
]

if not ready_ids:
logger.error(
"Circular dependency in workflow steps; falling back to sequential"
)
for sid in remaining:
groups.append([step_map[sid]])
break

groups.append([step_map[sid] for sid in ready_ids])
for sid in ready_ids:
remaining.remove(sid)
completed.update(ready_ids)

return groups

def _determine_workflow_status(
self, steps: List[Dict[str, Any]], execution_context: Dict[str, Any]
) -> None:
Expand Down Expand Up @@ -138,16 +193,17 @@ async def execute_coordinated_workflow(
}

try:
# Execute steps with dependency management (Issue #398: refactored)
for step in steps:
if not await self._check_step_dependencies(
step, execution_context["step_results"]
):
logger.warning("Step %s dependencies not met, skipping", step["id"])
step["status"] = "skipped"
continue
# Execute steps in dependency-ordered parallel groups (Issue #2172)
groups = self._group_steps_by_dependency(steps)
logger.info(
"Workflow %s: %d steps in %d parallel group(s)",
workflow_id,
len(steps),
len(groups),
)

await self._execute_step_with_agent(step, execution_context, context)
for group in groups:
await self._execute_step_group(group, execution_context, context)

self._determine_workflow_status(steps, execution_context)
return execution_context
Expand All @@ -158,6 +214,37 @@ async def execute_coordinated_workflow(
execution_context["error"] = str(e)
return execution_context

async def _execute_step_group(
self,
group: List[Dict[str, Any]],
execution_context: Dict[str, Any],
context: Dict[str, Any],
) -> None:
"""
Execute a group of steps concurrently using asyncio.gather.

Steps within a group have no inter-dependencies and are safe to run
in parallel. Issue #2172.

Args:
group: Steps to execute in parallel
execution_context: Shared execution context
context: Workflow context
"""
if len(group) == 1:
await self._execute_step_with_agent(group[0], execution_context, context)
return

logger.info(
"Executing %d steps in parallel: %s", len(group), [s["id"] for s in group]
)
await asyncio.gather(
*(
self._execute_step_with_agent(step, execution_context, context)
for step in group
)
)

async def _check_step_dependencies(
self,
step: Dict[str, Any],
Expand Down Expand Up @@ -272,6 +359,14 @@ def _build_step_failure_result(
"step_id": step_id,
}

@circuit_breaker_async(
"workflow_step_execution",
failure_threshold=CircuitBreakerDefaults.LLM_FAILURE_THRESHOLD,
recovery_timeout=CircuitBreakerDefaults.LLM_RECOVERY_TIMEOUT,
)
@retry_async(
max_attempts=RetryConfig.MIN_RETRIES, strategy=RetryStrategy.EXPONENTIAL_BACKOFF
)
async def _execute_coordinated_step(
self,
step: Dict[str, Any],
Expand All @@ -282,6 +377,7 @@ async def _execute_coordinated_step(
Execute a single workflow step with agent coordination.

Issue #620: Refactored to use extracted helper methods.
Issue #2168: Protected by circuit breaker and retry decorator.

Args:
step: The step to execute
Expand Down
Loading