Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions autobot-backend/initialization/lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,59 @@ async def _recover_index_queue():
logger.warning("Index queue recovery failed (non-fatal): %s", e)


async def _wire_scheduler_executor() -> None:
"""Wire the orchestration WorkflowExecutor into the global WorkflowScheduler (#2166).

Replaces the scheduler's fallback _default_template_executor with an adapter
that delegates non-template scheduled workflows to EnhancedOrchestrator, enabling
full agent coordination, step dependency management, and auto-documentation for
all scheduled workflows — not just template-based ones.

NON-CRITICAL: template-based workflows still work if this fails.
"""
logger.info("[ 98%%] Scheduler: Wiring orchestration executor...")
try:
from enhanced_orchestrator import EnhancedOrchestrator
from workflow_scheduler import ScheduledWorkflow, workflow_scheduler

orchestrator = EnhancedOrchestrator()

async def _orchestration_executor(workflow: ScheduledWorkflow):
"""Adapter: ScheduledWorkflow → EnhancedOrchestrator.execute_enhanced_workflow.

Routes template-based workflows to the template executor and
non-template workflows through the full orchestration pipeline.
Ref: #2166.
"""
if workflow.template_id:
from workflow_scheduler import _default_template_executor

return await _default_template_executor(workflow)

context = {
"workflow_id": workflow.id,
"user_id": workflow.user_id,
"variables": workflow.variables or {},
"tags": workflow.tags or [],
"auto_approve": workflow.auto_approve,
"scheduled": True,
}
result = await orchestrator.execute_enhanced_workflow(
user_request=workflow.user_message,
context=context,
auto_document=True,
)
succeeded = result.get("status") in ("completed", "partially_completed")
return {"success": succeeded, **result}

workflow_scheduler.set_workflow_executor(_orchestration_executor)
logger.info("[ 98%%] Scheduler: Orchestration executor wired")
except Exception as e:
logger.warning(
"Scheduler executor wiring failed (template-only fallback active): %s", e
)


async def initialize_background_services(app: FastAPI):
"""
Phase 2: Initialize background services (NON-BLOCKING).
Expand Down Expand Up @@ -874,6 +927,7 @@ async def initialize_background_services(app: FastAPI):
await _recover_index_queue()
await _init_process_adapter(app)
await _seed_agent_registry()
await _wire_scheduler_executor()

await update_app_state_multi(
initialization_status="ready",
Expand Down
Loading