-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Priority: P2 — Core of the multi-step execution path
File: src/tool_classifier/workflows/service_workflow.py
Type: New methods
Add two new methods:
execute_direct_step(request, time_metric) -> Optional[OrchestrationResponse]
Used by the non-streaming path. Calls _parse_service_prefix(request.message), then calls _call_service_endpoint() directly (no discovery, no intent detection, no entity extraction — the URL is already known). Returns a populated OrchestrationResponse with content and buttons from the structured result.
execute_direct_step_streaming(request, time_metric) -> Optional[AsyncIterator[str]]
Used by the streaming path. Same logic but wraps the response in the SSE generator:
async def step_stream():
yield orchestration_service.format_sse(chat_id, result["content"], result["buttons"])
yield orchestration_service.format_sse(chat_id, "END")
Both methods must log [chat_id] DIRECT STEP: {url} and return None if parsing fails (to allow graceful fallback).
Work:
- Implement execute_direct_step().
- Implement execute_direct_step_streaming().
- Add unit tests for both (mock _call_service_endpoint).
Metadata
Metadata
Assignees
Labels
Type
Projects
Status