diff --git a/autobot-backend/orchestration/workflow_executor.py b/autobot-backend/orchestration/workflow_executor.py index 8ab1ceada..d861db0a3 100644 --- a/autobot-backend/orchestration/workflow_executor.py +++ b/autobot-backend/orchestration/workflow_executor.py @@ -6,6 +6,9 @@ Issue #381: Extracted from enhanced_orchestrator.py god class refactoring. Contains workflow execution, step coordination, and agent interaction handling. + +Issue #2168: Added circuit breaker + retry decorators to step execution. +Issue #2172: Added parallel execution for independent workflow steps. """ import asyncio @@ -15,7 +18,13 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional -from constants.threshold_constants import TimingConstants +from circuit_breaker import circuit_breaker_async +from constants.threshold_constants import ( + CircuitBreakerDefaults, + RetryConfig, + TimingConstants, +) +from retry_mechanism import RetryStrategy, retry_async from .types import AgentInteraction, AgentProfile @@ -57,6 +66,52 @@ def __init__( self._release_agent = release_agent_callback self._update_performance = update_performance_callback + def _group_steps_by_dependency( + self, steps: List[Dict[str, Any]] + ) -> List[List[Dict[str, Any]]]: + """ + Group workflow steps for parallel execution. + + Steps with no unmet dependencies form a parallel group. Each group + runs concurrently; groups are ordered sequentially. Mirrors the + DependencyAnalyzer.get_parallel_groups pattern from + tools/parallel/analyzer.py. Issue #2172. + + Args: + steps: List of workflow steps with optional 'dependencies' field + + Returns: + Ordered list of groups; steps in each group run concurrently + """ + step_map = {s["id"]: s for s in steps} + remaining = [s["id"] for s in steps] + completed: set = set() + groups: List[List[Dict[str, Any]]] = [] + + while remaining: + ready_ids = [ + sid + for sid in remaining + if all( + dep in completed for dep in step_map[sid].get("dependencies", []) + ) + ] + + if not ready_ids: + logger.error( + "Circular dependency in workflow steps; falling back to sequential" + ) + for sid in remaining: + groups.append([step_map[sid]]) + break + + groups.append([step_map[sid] for sid in ready_ids]) + for sid in ready_ids: + remaining.remove(sid) + completed.update(ready_ids) + + return groups + def _determine_workflow_status( self, steps: List[Dict[str, Any]], execution_context: Dict[str, Any] ) -> None: @@ -138,16 +193,17 @@ async def execute_coordinated_workflow( } try: - # Execute steps with dependency management (Issue #398: refactored) - for step in steps: - if not await self._check_step_dependencies( - step, execution_context["step_results"] - ): - logger.warning("Step %s dependencies not met, skipping", step["id"]) - step["status"] = "skipped" - continue + # Execute steps in dependency-ordered parallel groups (Issue #2172) + groups = self._group_steps_by_dependency(steps) + logger.info( + "Workflow %s: %d steps in %d parallel group(s)", + workflow_id, + len(steps), + len(groups), + ) - await self._execute_step_with_agent(step, execution_context, context) + for group in groups: + await self._execute_step_group(group, execution_context, context) self._determine_workflow_status(steps, execution_context) return execution_context @@ -158,6 +214,37 @@ async def execute_coordinated_workflow( execution_context["error"] = str(e) return execution_context + async def _execute_step_group( + self, + group: List[Dict[str, Any]], + execution_context: Dict[str, Any], + context: Dict[str, Any], + ) -> None: + """ + Execute a group of steps concurrently using asyncio.gather. + + Steps within a group have no inter-dependencies and are safe to run + in parallel. Issue #2172. + + Args: + group: Steps to execute in parallel + execution_context: Shared execution context + context: Workflow context + """ + if len(group) == 1: + await self._execute_step_with_agent(group[0], execution_context, context) + return + + logger.info( + "Executing %d steps in parallel: %s", len(group), [s["id"] for s in group] + ) + await asyncio.gather( + *( + self._execute_step_with_agent(step, execution_context, context) + for step in group + ) + ) + async def _check_step_dependencies( self, step: Dict[str, Any], @@ -272,6 +359,14 @@ def _build_step_failure_result( "step_id": step_id, } + @circuit_breaker_async( + "workflow_step_execution", + failure_threshold=CircuitBreakerDefaults.LLM_FAILURE_THRESHOLD, + recovery_timeout=CircuitBreakerDefaults.LLM_RECOVERY_TIMEOUT, + ) + @retry_async( + max_attempts=RetryConfig.MIN_RETRIES, strategy=RetryStrategy.EXPONENTIAL_BACKOFF + ) async def _execute_coordinated_step( self, step: Dict[str, Any], @@ -282,6 +377,7 @@ async def _execute_coordinated_step( Execute a single workflow step with agent coordination. Issue #620: Refactored to use extracted helper methods. + Issue #2168: Protected by circuit breaker and retry decorator. Args: step: The step to execute