-
Notifications
You must be signed in to change notification settings - Fork 65
LCORE-1198: Refactor of query events parsing #1009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LCORE-1198: Refactor of query events parsing #1009
Conversation
WalkthroughThis PR refactors tool call and response data handling across query and streaming endpoints, introducing stronger typing for OpenAI response outputs, consolidating tool call summary generation with multiple output type branches, converting error streaming from async to sync, and adding utility functions for argument parsing and shield violation streaming. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/utils/types.py (1)
174-199: Ensure tool result content stays valid JSON when responses contain complex types.
content_to_strfalls back tostr()for unhandled types, which yields Python repr format with single quotes for dicts. SinceToolResultSummary.contentexpects a string and downstream code in_extract_rag_chunks_from_responseparses JSON, consider serializing dict/list viajson.dumpsbefore assignment to preserve valid JSON.Suggested adjustment
- response_content = content_to_str(resp.content) if resp else None + if resp: + if isinstance(resp.content, (dict, list)): + response_content = json.dumps(resp.content) + else: + response_content = content_to_str(resp.content) + else: + response_content = None
🤖 Fix all issues with AI agents
In `@src/app/endpoints/query_v2.py`:
- Around line 165-176: The ToolResultSummary.content currently assigns
item.error or item.output directly (via the local variable content), which can
be dict/list and violates the str type; update the code that builds content
(used when constructing ToolResultSummary in this block) to serialize non-string
values with json.dumps (e.g., if not isinstance(content, str): content =
json.dumps(content)) before passing to ToolResultSummary so
item.error/item.output are always strings; ensure to import json if not already
and preserve the existing "success"/"failure" logic and the surrounding
ToolCallSummary/ToolResultSummary construction.
- Around line 123-127: The current construction of response_payload assumes
every entry in item.results has model_dump(), causing AttributeError for dict
entries; update the logic that builds response_payload (where response_payload
is set from item.results) to iterate results and for each result use it directly
if isinstance(result, dict) otherwise call result.model_dump(); ensure this
change is applied to the block that creates the {"results": [...] } payload so
both dicts and model objects are handled safely.
In `@src/app/endpoints/streaming_query_v2.py`:
- Around line 243-267: The error handling for event_type "response.failed" must
guard against failed_chunk.response.error.message being None before calling
InternalServerErrorResponse.query_failed(cause: str); update the branch that
sets error_message (in the "response.failed" branch using failed_chunk /
latest_response_object) to check message truthiness (e.g., if
failed_chunk.response.error and failed_chunk.response.error.message) and fall
back to the generic string before passing to
InternalServerErrorResponse.query_failed, and keep the existing logging/yield
behavior.
In `@src/models/responses.py`:
- Around line 1887-1902: Update the docstring for the classmethod query_failed
on InternalServerErrorResponse to use Google-style docstrings: change the
section header "Parameters:" to "Args:" and keep the rest of the docstring
content (cause description and return description) unchanged so the docstring
follows the project's Google Python conventions.
In `@src/utils/query.py`:
- Around line 60-118: create_violation_stream emits events with
content_index/output_index 0→1→2 while response_obj.output has a single element;
update the emitted events so all references point to the single output at index
0. Specifically, in create_violation_stream change the
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseOutputTextDelta, and
OpenAIResponseObjectStreamResponseOutputTextDone yields to use content_index=0
and output_index=0 (keep their item_id/sequence_number values as needed), and
ensure the final response_obj.output remains the single element referenced by
those indices.
🧹 Nitpick comments (1)
src/app/endpoints/query_v2.py (1)
109-116: Guard againstNonearguments before parsing.
If Llama Stack can emitnull/missingarguments,parse_arguments_stringwill raise on.strip(). Consider a safe fallback.🔧 Suggested defensive guard
- args=parse_arguments_string(item.arguments), + args=parse_arguments_string(item.arguments or ""), ... - args = parse_arguments_string(item.arguments) + args = parse_arguments_string(item.arguments or "") ... - args = parse_arguments_string(item.arguments) + args = parse_arguments_string(item.arguments or "")Also applies to: 162-163, 212-213
| response_payload: Optional[dict[str, Any]] = None | ||
| if item.results is not None: | ||
| response_payload = { | ||
| "results": [ | ||
| { | ||
| "file_id": ( | ||
| getattr(result, "file_id", None) | ||
| if not isinstance(result, dict) | ||
| else result.get("file_id") | ||
| ), | ||
| "filename": ( | ||
| getattr(result, "filename", None) | ||
| if not isinstance(result, dict) | ||
| else result.get("filename") | ||
| ), | ||
| "score": ( | ||
| getattr(result, "score", None) | ||
| if not isinstance(result, dict) | ||
| else result.get("score") | ||
| ), | ||
| } | ||
| for result in results | ||
| ] | ||
| "results": [result.model_dump() for result in item.results] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's read the specific file and understand the context around lines 123-127
cat -n src/app/endpoints/query_v2.py | head -150 | tail -50Repository: lightspeed-core/lightspeed-stack
Length of output: 208
🏁 Script executed:
# Also check the full file structure to understand what we're working with
wc -l src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 109
🏁 Script executed:
# Let's search for the item.results usage and understand its type
rg "item\.results" src/app/endpoints/query_v2.py -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 717
🏁 Script executed:
# Search for model_dump usage in this file to understand patterns
rg "model_dump" src/app/endpoints/query_v2.py -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 267
🏁 Script executed:
# Look for any dict handling with results in this file
rg "results.*dict|dict.*results" src/app/endpoints/query_v2.py -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Check type annotations and imports to understand data types
head -50 src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1680
🏁 Script executed:
# Let's get more context around the code and understand the full function
sed -n '100,150p' src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2045
🏁 Script executed:
# Search for more result processing patterns to find where dicts are allegedly handled
rg "results" src/app/endpoints/query_v2.py -B 2 -A 2 | head -100Repository: lightspeed-core/lightspeed-stack
Length of output: 2144
🏁 Script executed:
# Check if there's any handling that creates dicts from results
rg '"results"' src/app/endpoints/query_v2.py -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 862
🏁 Script executed:
# Let's look for the type definition - check if there are any type stubs or if we can infer from usage
rg "OpenAIResponseOutputMessageFileSearchToolCall" . -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 806
🏁 Script executed:
# Check if there's a models or type definition file in the project
find . -name "*.py" -type f | grep -E "(models|types)" | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 757
🏁 Script executed:
# Let's see the full context of where dicts are handled
sed -n '550,650p' src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 4838
🏁 Script executed:
# Get the full section of the dict handling code
rg -A 10 "Handle both object and dict access" src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1262
🏁 Script executed:
# Let's understand when results might be dicts vs objects
sed -n '540,580p' src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2027
🏁 Script executed:
# Check the extraction_text_from_response_output_item function to understand the pattern better
ast-grep --pattern 'def extract_text_from_response_output_item($_) {
$$$
}'Repository: lightspeed-core/lightspeed-stack
Length of output: 58
Handle dict results to avoid AttributeError.
result.model_dump() will fail if item.results contains dicts. Other parts of this same file already handle this inconsistency (e.g., lines ~540-580 use isinstance(result, dict) checks for the same results field), and the token usage function documents that llama_stack returns both dicts and objects due to internal type inconsistencies. Use a robust check to handle both cases:
🐛 Robust result handling
- response_payload = {
- "results": [result.model_dump() for result in item.results]
- }
+ response_payload = {
+ "results": [
+ result if isinstance(result, dict) else result.model_dump()
+ for result in item.results
+ ]
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| response_payload: Optional[dict[str, Any]] = None | |
| if item.results is not None: | |
| response_payload = { | |
| "results": [ | |
| { | |
| "file_id": ( | |
| getattr(result, "file_id", None) | |
| if not isinstance(result, dict) | |
| else result.get("file_id") | |
| ), | |
| "filename": ( | |
| getattr(result, "filename", None) | |
| if not isinstance(result, dict) | |
| else result.get("filename") | |
| ), | |
| "score": ( | |
| getattr(result, "score", None) | |
| if not isinstance(result, dict) | |
| else result.get("score") | |
| ), | |
| } | |
| for result in results | |
| ] | |
| "results": [result.model_dump() for result in item.results] | |
| } | |
| response_payload: Optional[dict[str, Any]] = None | |
| if item.results is not None: | |
| response_payload = { | |
| "results": [ | |
| result if isinstance(result, dict) else result.model_dump() | |
| for result in item.results | |
| ] | |
| } |
🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 123 - 127, The current
construction of response_payload assumes every entry in item.results has
model_dump(), causing AttributeError for dict entries; update the logic that
builds response_payload (where response_payload is set from item.results) to
iterate results and for each result use it directly if isinstance(result, dict)
otherwise call result.model_dump(); ensure this change is applied to the block
that creates the {"results": [...] } payload so both dicts and model objects are
handled safely.
| content = item.error if item.error else (item.output if item.output else "") | ||
|
|
||
| return ToolCallSummary( | ||
| id=str(getattr(output_item, "id")), | ||
| name=getattr(output_item, "name", "mcp_call"), | ||
| id=item.id, | ||
| name=item.name, | ||
| args=args, | ||
| type="mcp_call", | ||
| ), ToolResultSummary( | ||
| id=str(getattr(output_item, "id")), | ||
| status=str(getattr(output_item, "status", None)), | ||
| content=getattr(output_item, "output", ""), | ||
| id=item.id, | ||
| status="success" if item.error is None else "failure", | ||
| content=content, | ||
| type="mcp_call", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the ToolResultSummary definition
rg "class ToolResultSummary" -A 15Repository: lightspeed-core/lightspeed-stack
Length of output: 1059
🏁 Script executed:
# Check the context around line 165-176 in query_v2.py
cat -n src/app/endpoints/query_v2.py | sed -n '155,185p'Repository: lightspeed-core/lightspeed-stack
Length of output: 1287
🏁 Script executed:
# Find what item's type is
rg "def.*item.*:" src/app/endpoints/query_v2.py -B 5 -A 1 | head -40Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Check if json is already imported
grep -n "^import json\|^from json" src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 89
🏁 Script executed:
# Search for how ToolResultSummary is used downstream
rg "ToolResultSummary" src/app/endpoints/query_v2.py -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 1964
🏁 Script executed:
# Find OpenAIResponseOutputMessageMCPCall definition to see item.error and item.output types
rg "class OpenAIResponseOutputMessageMCPCall" -A 20Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Also check if there are type hints for error and output fields
rg "OpenAIResponseOutputMessageMCPCall\|MCPCall" --type py -A 15 | head -60Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Check the actual Llama Stack client types
fd "llama_stack" --type f -name "*.py" | xargs rg "MCPCall" -A 5 2>/dev/null | head -50Repository: lightspeed-core/lightspeed-stack
Length of output: 1169
🏁 Script executed:
# Let's search for imports related to OpenAI response types
rg "from.*import.*OpenAIResponseOutput" -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Check the imports at the top of query_v2.py to see where types come from
head -50 src/app/endpoints/query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1680
🏁 Script executed:
# Search for Llama Stack client imports to understand the types
rg "from llama_stack" src/app/endpoints/query_v2.py -A 10Repository: lightspeed-core/lightspeed-stack
Length of output: 861
🏁 Script executed:
# Check if there's any documentation or usage that shows item.output/item.error types
rg "item\.output|item\.error" src/app/endpoints/query_v2.py -B 5 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 693
🏁 Script executed:
# Let's check what ToolResultSummary.content is expecting and how it's used
rg "ToolResultSummary" src/ -A 3 -B 1 | grep -A 10 "content"Repository: lightspeed-core/lightspeed-stack
Length of output: 2595
🏁 Script executed:
# Check downstream usage - see if content is serialized to JSON later
rg "tool_results\|ToolResultSummary" src/app/endpoints/query_v2.py -A 3 | tail -30Repository: lightspeed-core/lightspeed-stack
Length of output: 58
Serialize MCP output to string to match ToolResultSummary.content field type.
The ToolResultSummary.content field is typed as str. If item.error or item.output are dicts or lists from the Llama Stack API, they will be passed directly without serialization, causing type mismatches. This is inconsistent with other tool result types in the same function (web search, MCP list tools, and approval requests all use json.dumps() for structured content).
🧾 Serialize non-string outputs
- content = item.error if item.error else (item.output if item.output else "")
+ raw_content = item.error if item.error is not None else item.output
+ if raw_content is None:
+ content = ""
+ elif isinstance(raw_content, str):
+ content = raw_content
+ else:
+ content = json.dumps(raw_content)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| content = item.error if item.error else (item.output if item.output else "") | |
| return ToolCallSummary( | |
| id=str(getattr(output_item, "id")), | |
| name=getattr(output_item, "name", "mcp_call"), | |
| id=item.id, | |
| name=item.name, | |
| args=args, | |
| type="mcp_call", | |
| ), ToolResultSummary( | |
| id=str(getattr(output_item, "id")), | |
| status=str(getattr(output_item, "status", None)), | |
| content=getattr(output_item, "output", ""), | |
| id=item.id, | |
| status="success" if item.error is None else "failure", | |
| content=content, | |
| type="mcp_call", | |
| raw_content = item.error if item.error is not None else item.output | |
| if raw_content is None: | |
| content = "" | |
| elif isinstance(raw_content, str): | |
| content = raw_content | |
| else: | |
| content = json.dumps(raw_content) | |
| return ToolCallSummary( | |
| id=item.id, | |
| name=item.name, | |
| args=args, | |
| type="mcp_call", | |
| ), ToolResultSummary( | |
| id=item.id, | |
| status="success" if item.error is None else "failure", | |
| content=content, | |
| type="mcp_call", |
🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 165 - 176, The
ToolResultSummary.content currently assigns item.error or item.output directly
(via the local variable content), which can be dict/list and violates the str
type; update the code that builds content (used when constructing
ToolResultSummary in this block) to serialize non-string values with json.dumps
(e.g., if not isinstance(content, str): content = json.dumps(content)) before
passing to ToolResultSummary so item.error/item.output are always strings;
ensure to import json if not already and preserve the existing
"success"/"failure" logic and the surrounding ToolCallSummary/ToolResultSummary
construction.
| elif event_type == "response.incomplete": | ||
| error_response = InternalServerErrorResponse.query_failed( | ||
| "An unexpected error occurred while processing the request." | ||
| ) | ||
| logger.error("Error while obtaining answer for user question") | ||
| yield format_stream_data( | ||
| {"event": "error", "data": {**error_response.detail.model_dump()}} | ||
| ) | ||
| return | ||
|
|
||
| # Failed response - emit error with custom cause from error message | ||
| elif event_type == "response.failed": | ||
| failed_chunk = cast(OpenAIResponseObjectStreamResponseFailed, chunk) | ||
| latest_response_object = failed_chunk.response | ||
| error_message = ( | ||
| failed_chunk.response.error.message | ||
| if failed_chunk.response.error | ||
| else "An unexpected error occurred while processing the request." | ||
| ) | ||
| error_response = InternalServerErrorResponse.query_failed(error_message) | ||
| logger.error("Error while obtaining answer for user question") | ||
| yield format_stream_data( | ||
| {"event": "error", "data": {**error_response.detail.model_dump()}} | ||
| ) | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the streaming_query_v2.py file and understand imports
fd streaming_query_v2 --type fRepository: lightspeed-core/lightspeed-stack
Length of output: 167
🏁 Script executed:
# Look at the imports and type definitions at the top of the file
head -80 src/app/endpoints/streaming_query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2756
🏁 Script executed:
# Search for InternalServerErrorResponse class definition
rg "class InternalServerErrorResponse" -A 20Repository: lightspeed-core/lightspeed-stack
Length of output: 1389
🏁 Script executed:
# Search for query_failed method signature
rg "def query_failed" -A 5 -B 2Repository: lightspeed-core/lightspeed-stack
Length of output: 515
🏁 Script executed:
# Search for OpenAIResponseObjectStreamResponseFailed type definition
rg "OpenAIResponseObjectStreamResponseFailed" --type py -A 5Repository: lightspeed-core/lightspeed-stack
Length of output: 1197
🏁 Script executed:
# Check if there's any type hint or definition for OpenAIResponseObjectStreamResponseFailed in the repo
rg "OpenAIResponseObjectStreamResponseFailed" -A 10 -B 5 --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2831
🏁 Script executed:
# Look for any usage patterns of .error.message in the codebase
rg "\.error\.message" -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 748
🏁 Script executed:
# Check the actual code around lines 243-267 to see the full context
sed -n '230,275p' src/app/endpoints/streaming_query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2167
🌐 Web query:
OpenAIResponseObjectStreamResponseFailed llama_stack error message type nullable None
💡 Result:
Short answer — it means the code converted an OpenAI Responses stream item to a Llama Stack message but couldn't map the item's role to a message class, so message_type was None and the code failed.
Why it happens
- Llama Stack maps roles to classes like "user", "system", "assistant", "developer". If the streamed response contains a different or unexpected role (or a nullable/None role), _get_message_type_by_role returns None and the conversion raises/fails. [1]
How to fix
- Inspect the streamed item that caused the failure (log input_item.role).
- Ensure the Responses stream uses one of the expected roles ("user", "system", "assistant", "developer").
- If you must accept other roles, update the mapping in _get_message_type_by_role to include your role -> message class, or add a safe fallback (e.g., treat unknown roles as user or skip). Example change:
- before:
role_to_type = { "user": OpenAIUserMessageParam, ... } - after (fallback):
role_to_type.get(role, OpenAIUserMessageParam)
- Alternatively, add a clearer guard and error message when role is None to make debugging easier.
Relevant reference
- Llama Stack OpenAI responses conversion shows exactly this mapping and the check that raises when message_type is None. [1]
If you want, paste the exact stacktrace or the streamed JSON payload and I’ll point to the exact field and a concrete patch.
🏁 Script executed:
# Look for test files that might show error structures
find . -name "*test*streaming*query*" -type fRepository: lightspeed-core/lightspeed-stack
Length of output: 180
🏁 Script executed:
# Check requirements or pyproject to understand llama_stack version
rg "llama.*stack" --type toml --type txt -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 1679
🏁 Script executed:
# Search for other usages of .error.message in the codebase to see patterns
rg "error\.message" -B 5 -A 2 --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 900
🏁 Script executed:
# Check if there are type annotations or imports that show Optional types for error
rg "Optional.*message" --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 246
🏁 Script executed:
# Check the test file to see error structures
cat tests/unit/app/endpoints/test_streaming_query_v2.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 24359
🏁 Script executed:
# Search for any mock or fixture definitions of OpenAIResponseObjectStreamResponseFailed
rg "OpenAIResponseObjectStreamResponseFailed" -A 10 tests/Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Check if there's any documentation or type checking configuration
rg "error.*message" -B 2 -A 2 tests/Repository: lightspeed-core/lightspeed-stack
Length of output: 4494
🏁 Script executed:
# Search for any Python type hints or pydantic models that might show error structure
rg "error.*message" --type py -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 18755
🏁 Script executed:
# Check if there are any existing error handling patterns that check for None message
rg "message.*None" --type py -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 16923
🏁 Script executed:
# Try to find the actual definition or imports from llama_stack
rg "from llama_stack" -A 5 --type py | head -100Repository: lightspeed-core/lightspeed-stack
Length of output: 5398
🌐 Web query:
llama_stack 0.3.5 OpenAIResponseObjectStreamResponseFailed error message nullable Optional
💡 Result:
Short answer: change OpenAIResponseError.message from a required str to an optional/nullable field (e.g. str | None or Optional[str]) in the OpenAI responses schema, and keep OpenAIResponseObject.error typed as OpenAIResponseError | None (it already is). See the llama-stack source where OpenAIResponseError and OpenAIResponseObject are defined. [1][2]
What to change (concise patch)
- In llama_stack/apis/agents/openai_responses.py change:
- from
- code: str
- message: str
- to
- code: str
- message: str | None = None
(or message: Optional[str] = None with a typing import)
- from
Why: OpenAIResponseObject.error is already optional (OpenAIResponseError | None) but the error.message field is currently declared as a required str in the llama-stack schema; making message nullable matches cases where an error object may be present but message is absent. See definitions in the repo. [1][2]
If you prefer not to modify source, alternative:
- Upgrade to a newer llama-stack version if a fix/schema change exists upstream, or override/validate responses on the client side to accept missing message.
Sources
- OpenAI responses schema in llama-stack (OpenAIResponseError / OpenAIResponseObject). [1][2]
References
[1] llama-stack openai_responses.py — OpenAIResponseError definition (message currently str).
[2] llama-stack openai_responses.py — OpenAIResponseObject (error: OpenAIResponseError | None).
Add defensive check for None message in error handling.
When failed_chunk.response.error exists, failed_chunk.response.error.message can be None (despite the type annotation suggesting otherwise in llama_stack 0.3.5), which would violate the query_failed(cause: str) contract and cause a runtime error. Add an additional truthiness check for the message field:
error_message = (
failed_chunk.response.error.message
if failed_chunk.response.error
and failed_chunk.response.error.message
else "An unexpected error occurred while processing the request."
)🤖 Prompt for AI Agents
In `@src/app/endpoints/streaming_query_v2.py` around lines 243 - 267, The error
handling for event_type "response.failed" must guard against
failed_chunk.response.error.message being None before calling
InternalServerErrorResponse.query_failed(cause: str); update the branch that
sets error_message (in the "response.failed" branch using failed_chunk /
latest_response_object) to check message truthiness (e.g., if
failed_chunk.response.error and failed_chunk.response.error.message) and fall
back to the generic string before passing to
InternalServerErrorResponse.query_failed, and keep the existing logging/yield
behavior.
| @classmethod | ||
| def query_failed(cls, backend_url: str) -> "InternalServerErrorResponse": | ||
| def query_failed(cls, cause: str) -> "InternalServerErrorResponse": | ||
| """ | ||
| Create an InternalServerErrorResponse representing a failed query to an external backend. | ||
| Create an InternalServerErrorResponse representing a failed query. | ||
| Parameters: | ||
| backend_url (str): The backend URL included in the error cause message. | ||
| cause (str): The error cause message. | ||
| Returns: | ||
| InternalServerErrorResponse: An error response with response "Error | ||
| while processing query" and cause "Failed to call backend: | ||
| {backend_url}". | ||
| while processing query" and the provided cause. | ||
| """ | ||
| return cls( | ||
| response="Error while processing query", | ||
| cause=f"Failed to call backend: {backend_url}", | ||
| cause=cause, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n --type=py 'query_failed\(' -A2 -B1Repository: lightspeed-core/lightspeed-stack
Length of output: 1987
🏁 Script executed:
cat -n src/models/responses.py | sed -n '1887,1902p'Repository: lightspeed-core/lightspeed-stack
Length of output: 712
Update docstring section from Parameters: to Args: to follow Google Python conventions.
The query_failed method refactoring is correct—all call sites have been updated to pass the new cause parameter. However, the docstring uses Parameters: when it should use Args: per Google Python conventions specified in the coding guidelines.
Update line 1892 from:
Parameters:to:
Args:🤖 Prompt for AI Agents
In `@src/models/responses.py` around lines 1887 - 1902, Update the docstring for
the classmethod query_failed on InternalServerErrorResponse to use Google-style
docstrings: change the section header "Parameters:" to "Args:" and keep the rest
of the docstring content (cause description and return description) unchanged so
the docstring follows the project's Google Python conventions.
| async def create_violation_stream( | ||
| message: str, | ||
| shield_model: Optional[str] = None, | ||
| ) -> AsyncIterator[OpenAIResponseObjectStream]: | ||
| """Generate a minimal streaming response for cases where input is blocked by a shield. | ||
| This yields only the essential streaming events to indicate that the input was rejected. | ||
| Dummy item identifiers are used solely for protocol compliance and are not used later. | ||
| """ | ||
| response_id = "resp_shield_violation" | ||
|
|
||
| # Create the response object with empty output at the beginning | ||
| response_obj = OpenAIResponseObject( | ||
| id=response_id, | ||
| created_at=0, # not used | ||
| model=shield_model or "shield", | ||
| output=[], | ||
| status="in_progress", | ||
| ) | ||
| yield OpenAIResponseObjectStreamResponseCreated(response=response_obj) | ||
|
|
||
| # Triggers empty initial token | ||
| yield OpenAIResponseObjectStreamResponseContentPartAdded( | ||
| content_index=0, | ||
| response_id=response_id, | ||
| item_id="msg_shield_violation_1", | ||
| output_index=0, | ||
| part=OpenAIResponseContentPartOutputText(text=""), | ||
| sequence_number=0, | ||
| ) | ||
|
|
||
| # Text delta | ||
| yield OpenAIResponseObjectStreamResponseOutputTextDelta( | ||
| content_index=1, | ||
| delta=message, | ||
| item_id="msg_shield_violation_2", | ||
| output_index=1, | ||
| sequence_number=1, | ||
| ) | ||
|
|
||
| # Output text done | ||
| yield OpenAIResponseObjectStreamResponseOutputTextDone( | ||
| content_index=2, | ||
| text=message, | ||
| item_id="msg_shield_violation_3", | ||
| output_index=2, | ||
| sequence_number=2, | ||
| ) | ||
|
|
||
| # Fill the output when message is completed | ||
| response_obj.output = [ | ||
| OpenAIResponseMessage( | ||
| id="msg_shield_violation_4", | ||
| content=[OpenAIResponseOutputMessageContentOutputText(text=message)], | ||
| role="assistant", | ||
| status="completed", | ||
| ) | ||
| ] | ||
| # Update status to completed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's check if the file exists and read the relevant section
cat -n src/utils/query.py | head -150Repository: lightspeed-core/lightspeed-stack
Length of output: 4922
🏁 Script executed:
# Let's understand the type definitions for these OpenAI response classes
rg "class OpenAIResponseObjectStreamResponseOutputTextDelta|class OpenAIResponseObjectStreamResponseContentPartAdded|class OpenAIResponseObjectStreamResponseOutputTextDone" -A 15Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Let's find usages of create_violation_stream to understand how it's consumed
rg "create_violation_stream" -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 1067
🏁 Script executed:
# Let's examine the usage in streaming_query_v2.py
cat -n src/app/endpoints/streaming_query_v2.py | grep -A 20 -B 20 "create_violation_stream"Repository: lightspeed-core/lightspeed-stack
Length of output: 3593
🏁 Script executed:
# Look for other stream creation examples to see what pattern is used
rg "OpenAIResponseObjectStreamResponse" src/ -B 2 -A 10 | head -100Repository: lightspeed-core/lightspeed-stack
Length of output: 6023
🏁 Script executed:
# Let's see how the stream events are actually processed and if indices are used
rg "output_index|content_index" src/app/endpoints/streaming_query_v2.py -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Search for any usage of output_index or content_index in the codebase
rg "output_index|content_index" src/ --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 336
🏁 Script executed:
# Let's search for any documentation or examples related to OpenAI response streams
rg "output_index|content_index" . -B 5 -A 5 --type md --type txt 2>/dev/null || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Let's check if there are any comments or documentation about the Llama Stack types
# Look for any imports or references that might explain the semantics
rg "from llama_stack" src/ -B 2 -A 10 | head -80Repository: lightspeed-core/lightspeed-stack
Length of output: 3943
Keep output_index/content_index consistent for a single output item.
The stream currently emits output_index and content_index as 0 → 1 → 2 even though only one output message is produced. Since response_obj.output contains exactly one element, all events should reference output_index=0 and content_index=0. Clients expecting these indices to map to array positions in the response will encounter semantic inconsistencies.
🔧 Suggested fix
- yield OpenAIResponseObjectStreamResponseOutputTextDelta(
- content_index=1,
- delta=message,
- item_id="msg_shield_violation_2",
- output_index=1,
- sequence_number=1,
- )
+ yield OpenAIResponseObjectStreamResponseOutputTextDelta(
+ content_index=0,
+ delta=message,
+ item_id="msg_shield_violation_1",
+ output_index=0,
+ sequence_number=1,
+ )
@@
- yield OpenAIResponseObjectStreamResponseOutputTextDone(
- content_index=2,
- text=message,
- item_id="msg_shield_violation_3",
- output_index=2,
- sequence_number=2,
- )
+ yield OpenAIResponseObjectStreamResponseOutputTextDone(
+ content_index=0,
+ text=message,
+ item_id="msg_shield_violation_1",
+ output_index=0,
+ sequence_number=2,
+ )🤖 Prompt for AI Agents
In `@src/utils/query.py` around lines 60 - 118, create_violation_stream emits
events with content_index/output_index 0→1→2 while response_obj.output has a
single element; update the emitted events so all references point to the single
output at index 0. Specifically, in create_violation_stream change the
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseOutputTextDelta, and
OpenAIResponseObjectStreamResponseOutputTextDone yields to use content_index=0
and output_index=0 (keep their item_id/sequence_number values as needed), and
ensure the final response_obj.output remains the single element referenced by
those indices.
tisnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting change, LGTM
Description
This PR fixes compatibility issues between LCORE and OLS in streaming query SSEs format.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.