Skip to content

Conversation

@asimurka
Copy link
Contributor

@asimurka asimurka commented Jan 16, 2026

Description

This PR fixes compatibility issues between LCORE and OLS in streaming query SSEs format.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: Cursor

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced support for MCP tool calls, approval workflows, and tool discovery operations
    • Improved error message context for query failures
  • Bug Fixes

    • Improved parsing of tool call arguments with robust fallbacks
    • Better handling of shield violations and API failures
  • Refactor

    • Consolidated tool call event streaming for improved reliability
    • Streamlined error response handling in streaming queries

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 16, 2026

Walkthrough

This PR refactors tool call and response data handling across query and streaming endpoints, introducing stronger typing for OpenAI response outputs, consolidating tool call summary generation with multiple output type branches, converting error streaming from async to sync, and adding utility functions for argument parsing and shield violation streaming.

Changes

Cohort / File(s) Summary
Query and Response Type Refactoring
src/app/endpoints/query_v2.py, src/models/responses.py, src/utils/types.py
Refactored _build_tool_call_summary with stricter input typing (OpenAIResponseOutput) and expanded handling for function_call, file_search_call, web_search_call, mcp_call, mcp_list_tools, mcp_approval_request/response output types. Changed InternalServerErrorResponse.query_failed parameter from backend_url to cause. Narrowed ToolResultSummary.content from Any to str.
Streaming Event Generation Refactor
src/app/endpoints/streaming_query_v2.py, src/app/endpoints/streaming_query.py
Converted stream_http_error from async generator to sync iterator. Refactored streaming_query_v2 to use new event emission system (LLM_TOKEN_EVENT, LLM_TOOL_CALL_EVENT, LLM_TOOL_RESULT_EVENT via stream_event). Replaced granular tool argument delta/done events with consolidated OutputItemDone handling. Added error channel handling for response.incomplete and response.failed. Removed local create_violation_stream in favor of imported utility.
Utility Functions and Shield Handling
src/utils/query.py, src/utils/shields.py
Added parse_arguments_string utility for JSON argument parsing with fallbacks. Added create_violation_stream async generator to emit shield violation streaming sequence. Broadened exception handling in run_shield_moderation to catch BadRequestError and ValueError, returning blocked result on violation detection.
Integration and Unit Tests
tests/integration/endpoints/test_query_v2_integration.py, tests/unit/app/endpoints/test_query_v2.py, tests/unit/app/endpoints/test_streaming_query_v2.py, tests/unit/models/responses/test_*.py
Updated mocks to include new tool attributes (call_id, description, input_schema, model_dump). Changed function_call arguments from dict to JSON string format. Updated test expectations for file_search_call id field and ToolResultSummary content as JSON string. Replaced format_stream_data with stream_event in streaming tests and adjusted event sequence expectations.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

ok-to-test

Suggested reviewers

  • tisnik
  • eranco74
  • jrobertboos
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately summarizes the main refactoring objective of the PR, which involves restructuring query event parsing across multiple modules.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/utils/types.py (1)

174-199: Ensure tool result content stays valid JSON when responses contain complex types.

content_to_str falls back to str() for unhandled types, which yields Python repr format with single quotes for dicts. Since ToolResultSummary.content expects a string and downstream code in _extract_rag_chunks_from_response parses JSON, consider serializing dict/list via json.dumps before assignment to preserve valid JSON.

Suggested adjustment
-        response_content = content_to_str(resp.content) if resp else None
+        if resp:
+            if isinstance(resp.content, (dict, list)):
+                response_content = json.dumps(resp.content)
+            else:
+                response_content = content_to_str(resp.content)
+        else:
+            response_content = None
🤖 Fix all issues with AI agents
In `@src/app/endpoints/query_v2.py`:
- Around line 165-176: The ToolResultSummary.content currently assigns
item.error or item.output directly (via the local variable content), which can
be dict/list and violates the str type; update the code that builds content
(used when constructing ToolResultSummary in this block) to serialize non-string
values with json.dumps (e.g., if not isinstance(content, str): content =
json.dumps(content)) before passing to ToolResultSummary so
item.error/item.output are always strings; ensure to import json if not already
and preserve the existing "success"/"failure" logic and the surrounding
ToolCallSummary/ToolResultSummary construction.
- Around line 123-127: The current construction of response_payload assumes
every entry in item.results has model_dump(), causing AttributeError for dict
entries; update the logic that builds response_payload (where response_payload
is set from item.results) to iterate results and for each result use it directly
if isinstance(result, dict) otherwise call result.model_dump(); ensure this
change is applied to the block that creates the {"results": [...] } payload so
both dicts and model objects are handled safely.

In `@src/app/endpoints/streaming_query_v2.py`:
- Around line 243-267: The error handling for event_type "response.failed" must
guard against failed_chunk.response.error.message being None before calling
InternalServerErrorResponse.query_failed(cause: str); update the branch that
sets error_message (in the "response.failed" branch using failed_chunk /
latest_response_object) to check message truthiness (e.g., if
failed_chunk.response.error and failed_chunk.response.error.message) and fall
back to the generic string before passing to
InternalServerErrorResponse.query_failed, and keep the existing logging/yield
behavior.

In `@src/models/responses.py`:
- Around line 1887-1902: Update the docstring for the classmethod query_failed
on InternalServerErrorResponse to use Google-style docstrings: change the
section header "Parameters:" to "Args:" and keep the rest of the docstring
content (cause description and return description) unchanged so the docstring
follows the project's Google Python conventions.

In `@src/utils/query.py`:
- Around line 60-118: create_violation_stream emits events with
content_index/output_index 0→1→2 while response_obj.output has a single element;
update the emitted events so all references point to the single output at index
0. Specifically, in create_violation_stream change the
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseOutputTextDelta, and
OpenAIResponseObjectStreamResponseOutputTextDone yields to use content_index=0
and output_index=0 (keep their item_id/sequence_number values as needed), and
ensure the final response_obj.output remains the single element referenced by
those indices.
🧹 Nitpick comments (1)
src/app/endpoints/query_v2.py (1)

109-116: Guard against None arguments before parsing.
If Llama Stack can emit null/missing arguments, parse_arguments_string will raise on .strip(). Consider a safe fallback.

🔧 Suggested defensive guard
-                args=parse_arguments_string(item.arguments),
+                args=parse_arguments_string(item.arguments or ""),
...
-        args = parse_arguments_string(item.arguments)
+        args = parse_arguments_string(item.arguments or "")
...
-        args = parse_arguments_string(item.arguments)
+        args = parse_arguments_string(item.arguments or "")

Also applies to: 162-163, 212-213

Comment on lines +123 to 127
response_payload: Optional[dict[str, Any]] = None
if item.results is not None:
response_payload = {
"results": [
{
"file_id": (
getattr(result, "file_id", None)
if not isinstance(result, dict)
else result.get("file_id")
),
"filename": (
getattr(result, "filename", None)
if not isinstance(result, dict)
else result.get("filename")
),
"score": (
getattr(result, "score", None)
if not isinstance(result, dict)
else result.get("score")
),
}
for result in results
]
"results": [result.model_dump() for result in item.results]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's read the specific file and understand the context around lines 123-127
cat -n src/app/endpoints/query_v2.py | head -150 | tail -50

Repository: lightspeed-core/lightspeed-stack

Length of output: 208


🏁 Script executed:

# Also check the full file structure to understand what we're working with
wc -l src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 109


🏁 Script executed:

# Let's search for the item.results usage and understand its type
rg "item\.results" src/app/endpoints/query_v2.py -B 3 -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 717


🏁 Script executed:

# Search for model_dump usage in this file to understand patterns
rg "model_dump" src/app/endpoints/query_v2.py -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 267


🏁 Script executed:

# Look for any dict handling with results in this file
rg "results.*dict|dict.*results" src/app/endpoints/query_v2.py -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Check type annotations and imports to understand data types
head -50 src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1680


🏁 Script executed:

# Let's get more context around the code and understand the full function
sed -n '100,150p' src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2045


🏁 Script executed:

# Search for more result processing patterns to find where dicts are allegedly handled
rg "results" src/app/endpoints/query_v2.py -B 2 -A 2 | head -100

Repository: lightspeed-core/lightspeed-stack

Length of output: 2144


🏁 Script executed:

# Check if there's any handling that creates dicts from results
rg '"results"' src/app/endpoints/query_v2.py -B 3 -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 862


🏁 Script executed:

# Let's look for the type definition - check if there are any type stubs or if we can infer from usage
rg "OpenAIResponseOutputMessageFileSearchToolCall" . -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 806


🏁 Script executed:

# Check if there's a models or type definition file in the project
find . -name "*.py" -type f | grep -E "(models|types)" | head -20

Repository: lightspeed-core/lightspeed-stack

Length of output: 757


🏁 Script executed:

# Let's see the full context of where dicts are handled
sed -n '550,650p' src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 4838


🏁 Script executed:

# Get the full section of the dict handling code
rg -A 10 "Handle both object and dict access" src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1262


🏁 Script executed:

# Let's understand when results might be dicts vs objects
sed -n '540,580p' src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2027


🏁 Script executed:

# Check the extraction_text_from_response_output_item function to understand the pattern better
ast-grep --pattern 'def extract_text_from_response_output_item($_) {
  $$$
}'

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


Handle dict results to avoid AttributeError.
result.model_dump() will fail if item.results contains dicts. Other parts of this same file already handle this inconsistency (e.g., lines ~540-580 use isinstance(result, dict) checks for the same results field), and the token usage function documents that llama_stack returns both dicts and objects due to internal type inconsistencies. Use a robust check to handle both cases:

🐛 Robust result handling
-            response_payload = {
-                "results": [result.model_dump() for result in item.results]
-            }
+            response_payload = {
+                "results": [
+                    result if isinstance(result, dict) else result.model_dump()
+                    for result in item.results
+                ]
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
response_payload: Optional[dict[str, Any]] = None
if item.results is not None:
response_payload = {
"results": [
{
"file_id": (
getattr(result, "file_id", None)
if not isinstance(result, dict)
else result.get("file_id")
),
"filename": (
getattr(result, "filename", None)
if not isinstance(result, dict)
else result.get("filename")
),
"score": (
getattr(result, "score", None)
if not isinstance(result, dict)
else result.get("score")
),
}
for result in results
]
"results": [result.model_dump() for result in item.results]
}
response_payload: Optional[dict[str, Any]] = None
if item.results is not None:
response_payload = {
"results": [
result if isinstance(result, dict) else result.model_dump()
for result in item.results
]
}
🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 123 - 127, The current
construction of response_payload assumes every entry in item.results has
model_dump(), causing AttributeError for dict entries; update the logic that
builds response_payload (where response_payload is set from item.results) to
iterate results and for each result use it directly if isinstance(result, dict)
otherwise call result.model_dump(); ensure this change is applied to the block
that creates the {"results": [...] } payload so both dicts and model objects are
handled safely.

Comment on lines +165 to 176
content = item.error if item.error else (item.output if item.output else "")

return ToolCallSummary(
id=str(getattr(output_item, "id")),
name=getattr(output_item, "name", "mcp_call"),
id=item.id,
name=item.name,
args=args,
type="mcp_call",
), ToolResultSummary(
id=str(getattr(output_item, "id")),
status=str(getattr(output_item, "status", None)),
content=getattr(output_item, "output", ""),
id=item.id,
status="success" if item.error is None else "failure",
content=content,
type="mcp_call",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the ToolResultSummary definition
rg "class ToolResultSummary" -A 15

Repository: lightspeed-core/lightspeed-stack

Length of output: 1059


🏁 Script executed:

# Check the context around line 165-176 in query_v2.py
cat -n src/app/endpoints/query_v2.py | sed -n '155,185p'

Repository: lightspeed-core/lightspeed-stack

Length of output: 1287


🏁 Script executed:

# Find what item's type is
rg "def.*item.*:" src/app/endpoints/query_v2.py -B 5 -A 1 | head -40

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Check if json is already imported
grep -n "^import json\|^from json" src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 89


🏁 Script executed:

# Search for how ToolResultSummary is used downstream
rg "ToolResultSummary" src/app/endpoints/query_v2.py -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 1964


🏁 Script executed:

# Find OpenAIResponseOutputMessageMCPCall definition to see item.error and item.output types
rg "class OpenAIResponseOutputMessageMCPCall" -A 20

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Also check if there are type hints for error and output fields
rg "OpenAIResponseOutputMessageMCPCall\|MCPCall" --type py -A 15 | head -60

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Check the actual Llama Stack client types
fd "llama_stack" --type f -name "*.py" | xargs rg "MCPCall" -A 5 2>/dev/null | head -50

Repository: lightspeed-core/lightspeed-stack

Length of output: 1169


🏁 Script executed:

# Let's search for imports related to OpenAI response types
rg "from.*import.*OpenAIResponseOutput" -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Check the imports at the top of query_v2.py to see where types come from
head -50 src/app/endpoints/query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1680


🏁 Script executed:

# Search for Llama Stack client imports to understand the types
rg "from llama_stack" src/app/endpoints/query_v2.py -A 10

Repository: lightspeed-core/lightspeed-stack

Length of output: 861


🏁 Script executed:

# Check if there's any documentation or usage that shows item.output/item.error types
rg "item\.output|item\.error" src/app/endpoints/query_v2.py -B 5 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 693


🏁 Script executed:

# Let's check what ToolResultSummary.content is expecting and how it's used
rg "ToolResultSummary" src/ -A 3 -B 1 | grep -A 10 "content"

Repository: lightspeed-core/lightspeed-stack

Length of output: 2595


🏁 Script executed:

# Check downstream usage - see if content is serialized to JSON later
rg "tool_results\|ToolResultSummary" src/app/endpoints/query_v2.py -A 3 | tail -30

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


Serialize MCP output to string to match ToolResultSummary.content field type.
The ToolResultSummary.content field is typed as str. If item.error or item.output are dicts or lists from the Llama Stack API, they will be passed directly without serialization, causing type mismatches. This is inconsistent with other tool result types in the same function (web search, MCP list tools, and approval requests all use json.dumps() for structured content).

🧾 Serialize non-string outputs
-        content = item.error if item.error else (item.output if item.output else "")
+        raw_content = item.error if item.error is not None else item.output
+        if raw_content is None:
+            content = ""
+        elif isinstance(raw_content, str):
+            content = raw_content
+        else:
+            content = json.dumps(raw_content)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
content = item.error if item.error else (item.output if item.output else "")
return ToolCallSummary(
id=str(getattr(output_item, "id")),
name=getattr(output_item, "name", "mcp_call"),
id=item.id,
name=item.name,
args=args,
type="mcp_call",
), ToolResultSummary(
id=str(getattr(output_item, "id")),
status=str(getattr(output_item, "status", None)),
content=getattr(output_item, "output", ""),
id=item.id,
status="success" if item.error is None else "failure",
content=content,
type="mcp_call",
raw_content = item.error if item.error is not None else item.output
if raw_content is None:
content = ""
elif isinstance(raw_content, str):
content = raw_content
else:
content = json.dumps(raw_content)
return ToolCallSummary(
id=item.id,
name=item.name,
args=args,
type="mcp_call",
), ToolResultSummary(
id=item.id,
status="success" if item.error is None else "failure",
content=content,
type="mcp_call",
🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 165 - 176, The
ToolResultSummary.content currently assigns item.error or item.output directly
(via the local variable content), which can be dict/list and violates the str
type; update the code that builds content (used when constructing
ToolResultSummary in this block) to serialize non-string values with json.dumps
(e.g., if not isinstance(content, str): content = json.dumps(content)) before
passing to ToolResultSummary so item.error/item.output are always strings;
ensure to import json if not already and preserve the existing
"success"/"failure" logic and the surrounding ToolCallSummary/ToolResultSummary
construction.

Comment on lines +243 to +267
elif event_type == "response.incomplete":
error_response = InternalServerErrorResponse.query_failed(
"An unexpected error occurred while processing the request."
)
logger.error("Error while obtaining answer for user question")
yield format_stream_data(
{"event": "error", "data": {**error_response.detail.model_dump()}}
)
return

# Failed response - emit error with custom cause from error message
elif event_type == "response.failed":
failed_chunk = cast(OpenAIResponseObjectStreamResponseFailed, chunk)
latest_response_object = failed_chunk.response
error_message = (
failed_chunk.response.error.message
if failed_chunk.response.error
else "An unexpected error occurred while processing the request."
)
error_response = InternalServerErrorResponse.query_failed(error_message)
logger.error("Error while obtaining answer for user question")
yield format_stream_data(
{"event": "error", "data": {**error_response.detail.model_dump()}}
)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the streaming_query_v2.py file and understand imports
fd streaming_query_v2 --type f

Repository: lightspeed-core/lightspeed-stack

Length of output: 167


🏁 Script executed:

# Look at the imports and type definitions at the top of the file
head -80 src/app/endpoints/streaming_query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2756


🏁 Script executed:

# Search for InternalServerErrorResponse class definition
rg "class InternalServerErrorResponse" -A 20

Repository: lightspeed-core/lightspeed-stack

Length of output: 1389


🏁 Script executed:

# Search for query_failed method signature
rg "def query_failed" -A 5 -B 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 515


🏁 Script executed:

# Search for OpenAIResponseObjectStreamResponseFailed type definition
rg "OpenAIResponseObjectStreamResponseFailed" --type py -A 5

Repository: lightspeed-core/lightspeed-stack

Length of output: 1197


🏁 Script executed:

# Check if there's any type hint or definition for OpenAIResponseObjectStreamResponseFailed in the repo
rg "OpenAIResponseObjectStreamResponseFailed" -A 10 -B 5 --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2831


🏁 Script executed:

# Look for any usage patterns of .error.message in the codebase
rg "\.error\.message" -B 3 -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 748


🏁 Script executed:

# Check the actual code around lines 243-267 to see the full context
sed -n '230,275p' src/app/endpoints/streaming_query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2167


🌐 Web query:

OpenAIResponseObjectStreamResponseFailed llama_stack error message type nullable None

💡 Result:

Short answer — it means the code converted an OpenAI Responses stream item to a Llama Stack message but couldn't map the item's role to a message class, so message_type was None and the code failed.

Why it happens

  • Llama Stack maps roles to classes like "user", "system", "assistant", "developer". If the streamed response contains a different or unexpected role (or a nullable/None role), _get_message_type_by_role returns None and the conversion raises/fails. [1]

How to fix

  1. Inspect the streamed item that caused the failure (log input_item.role).
  2. Ensure the Responses stream uses one of the expected roles ("user", "system", "assistant", "developer").
  3. If you must accept other roles, update the mapping in _get_message_type_by_role to include your role -> message class, or add a safe fallback (e.g., treat unknown roles as user or skip). Example change:
  • before:
    role_to_type = { "user": OpenAIUserMessageParam, ... }
  • after (fallback):
    role_to_type.get(role, OpenAIUserMessageParam)
  1. Alternatively, add a clearer guard and error message when role is None to make debugging easier.

Relevant reference

  • Llama Stack OpenAI responses conversion shows exactly this mapping and the check that raises when message_type is None. [1]

If you want, paste the exact stacktrace or the streamed JSON payload and I’ll point to the exact field and a concrete patch.


🏁 Script executed:

# Look for test files that might show error structures
find . -name "*test*streaming*query*" -type f

Repository: lightspeed-core/lightspeed-stack

Length of output: 180


🏁 Script executed:

# Check requirements or pyproject to understand llama_stack version
rg "llama.*stack" --type toml --type txt -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 1679


🏁 Script executed:

# Search for other usages of .error.message in the codebase to see patterns
rg "error\.message" -B 5 -A 2 --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 900


🏁 Script executed:

# Check if there are type annotations or imports that show Optional types for error
rg "Optional.*message" --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 246


🏁 Script executed:

# Check the test file to see error structures
cat tests/unit/app/endpoints/test_streaming_query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 24359


🏁 Script executed:

# Search for any mock or fixture definitions of OpenAIResponseObjectStreamResponseFailed
rg "OpenAIResponseObjectStreamResponseFailed" -A 10 tests/

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Check if there's any documentation or type checking configuration
rg "error.*message" -B 2 -A 2 tests/

Repository: lightspeed-core/lightspeed-stack

Length of output: 4494


🏁 Script executed:

# Search for any Python type hints or pydantic models that might show error structure
rg "error.*message" --type py -B 3 -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 18755


🏁 Script executed:

# Check if there are any existing error handling patterns that check for None message
rg "message.*None" --type py -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 16923


🏁 Script executed:

# Try to find the actual definition or imports from llama_stack
rg "from llama_stack" -A 5 --type py | head -100

Repository: lightspeed-core/lightspeed-stack

Length of output: 5398


🌐 Web query:

llama_stack 0.3.5 OpenAIResponseObjectStreamResponseFailed error message nullable Optional

💡 Result:

Short answer: change OpenAIResponseError.message from a required str to an optional/nullable field (e.g. str | None or Optional[str]) in the OpenAI responses schema, and keep OpenAIResponseObject.error typed as OpenAIResponseError | None (it already is). See the llama-stack source where OpenAIResponseError and OpenAIResponseObject are defined. [1][2]

What to change (concise patch)

  • In llama_stack/apis/agents/openai_responses.py change:
    • from
      • code: str
      • message: str
    • to
      • code: str
      • message: str | None = None
        (or message: Optional[str] = None with a typing import)

Why: OpenAIResponseObject.error is already optional (OpenAIResponseError | None) but the error.message field is currently declared as a required str in the llama-stack schema; making message nullable matches cases where an error object may be present but message is absent. See definitions in the repo. [1][2]

If you prefer not to modify source, alternative:

  • Upgrade to a newer llama-stack version if a fix/schema change exists upstream, or override/validate responses on the client side to accept missing message.

Sources

  • OpenAI responses schema in llama-stack (OpenAIResponseError / OpenAIResponseObject). [1][2]

References
[1] llama-stack openai_responses.py — OpenAIResponseError definition (message currently str).
[2] llama-stack openai_responses.py — OpenAIResponseObject (error: OpenAIResponseError | None).


Add defensive check for None message in error handling.
When failed_chunk.response.error exists, failed_chunk.response.error.message can be None (despite the type annotation suggesting otherwise in llama_stack 0.3.5), which would violate the query_failed(cause: str) contract and cause a runtime error. Add an additional truthiness check for the message field:

error_message = (
    failed_chunk.response.error.message
    if failed_chunk.response.error
    and failed_chunk.response.error.message
    else "An unexpected error occurred while processing the request."
)
🤖 Prompt for AI Agents
In `@src/app/endpoints/streaming_query_v2.py` around lines 243 - 267, The error
handling for event_type "response.failed" must guard against
failed_chunk.response.error.message being None before calling
InternalServerErrorResponse.query_failed(cause: str); update the branch that
sets error_message (in the "response.failed" branch using failed_chunk /
latest_response_object) to check message truthiness (e.g., if
failed_chunk.response.error and failed_chunk.response.error.message) and fall
back to the generic string before passing to
InternalServerErrorResponse.query_failed, and keep the existing logging/yield
behavior.

Comment on lines 1887 to 1902
@classmethod
def query_failed(cls, backend_url: str) -> "InternalServerErrorResponse":
def query_failed(cls, cause: str) -> "InternalServerErrorResponse":
"""
Create an InternalServerErrorResponse representing a failed query to an external backend.
Create an InternalServerErrorResponse representing a failed query.
Parameters:
backend_url (str): The backend URL included in the error cause message.
cause (str): The error cause message.
Returns:
InternalServerErrorResponse: An error response with response "Error
while processing query" and cause "Failed to call backend:
{backend_url}".
while processing query" and the provided cause.
"""
return cls(
response="Error while processing query",
cause=f"Failed to call backend: {backend_url}",
cause=cause,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n --type=py 'query_failed\(' -A2 -B1

Repository: lightspeed-core/lightspeed-stack

Length of output: 1987


🏁 Script executed:

cat -n src/models/responses.py | sed -n '1887,1902p'

Repository: lightspeed-core/lightspeed-stack

Length of output: 712


Update docstring section from Parameters: to Args: to follow Google Python conventions.

The query_failed method refactoring is correct—all call sites have been updated to pass the new cause parameter. However, the docstring uses Parameters: when it should use Args: per Google Python conventions specified in the coding guidelines.

Update line 1892 from:

        Parameters:

to:

        Args:
🤖 Prompt for AI Agents
In `@src/models/responses.py` around lines 1887 - 1902, Update the docstring for
the classmethod query_failed on InternalServerErrorResponse to use Google-style
docstrings: change the section header "Parameters:" to "Args:" and keep the rest
of the docstring content (cause description and return description) unchanged so
the docstring follows the project's Google Python conventions.

Comment on lines +60 to +118
async def create_violation_stream(
message: str,
shield_model: Optional[str] = None,
) -> AsyncIterator[OpenAIResponseObjectStream]:
"""Generate a minimal streaming response for cases where input is blocked by a shield.
This yields only the essential streaming events to indicate that the input was rejected.
Dummy item identifiers are used solely for protocol compliance and are not used later.
"""
response_id = "resp_shield_violation"

# Create the response object with empty output at the beginning
response_obj = OpenAIResponseObject(
id=response_id,
created_at=0, # not used
model=shield_model or "shield",
output=[],
status="in_progress",
)
yield OpenAIResponseObjectStreamResponseCreated(response=response_obj)

# Triggers empty initial token
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=0,
response_id=response_id,
item_id="msg_shield_violation_1",
output_index=0,
part=OpenAIResponseContentPartOutputText(text=""),
sequence_number=0,
)

# Text delta
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=1,
delta=message,
item_id="msg_shield_violation_2",
output_index=1,
sequence_number=1,
)

# Output text done
yield OpenAIResponseObjectStreamResponseOutputTextDone(
content_index=2,
text=message,
item_id="msg_shield_violation_3",
output_index=2,
sequence_number=2,
)

# Fill the output when message is completed
response_obj.output = [
OpenAIResponseMessage(
id="msg_shield_violation_4",
content=[OpenAIResponseOutputMessageContentOutputText(text=message)],
role="assistant",
status="completed",
)
]
# Update status to completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and read the relevant section
cat -n src/utils/query.py | head -150

Repository: lightspeed-core/lightspeed-stack

Length of output: 4922


🏁 Script executed:

# Let's understand the type definitions for these OpenAI response classes
rg "class OpenAIResponseObjectStreamResponseOutputTextDelta|class OpenAIResponseObjectStreamResponseContentPartAdded|class OpenAIResponseObjectStreamResponseOutputTextDone" -A 15

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Let's find usages of create_violation_stream to understand how it's consumed
rg "create_violation_stream" -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 1067


🏁 Script executed:

# Let's examine the usage in streaming_query_v2.py
cat -n src/app/endpoints/streaming_query_v2.py | grep -A 20 -B 20 "create_violation_stream"

Repository: lightspeed-core/lightspeed-stack

Length of output: 3593


🏁 Script executed:

# Look for other stream creation examples to see what pattern is used
rg "OpenAIResponseObjectStreamResponse" src/ -B 2 -A 10 | head -100

Repository: lightspeed-core/lightspeed-stack

Length of output: 6023


🏁 Script executed:

# Let's see how the stream events are actually processed and if indices are used
rg "output_index|content_index" src/app/endpoints/streaming_query_v2.py -B 3 -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Search for any usage of output_index or content_index in the codebase
rg "output_index|content_index" src/ --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 336


🏁 Script executed:

# Let's search for any documentation or examples related to OpenAI response streams
rg "output_index|content_index" . -B 5 -A 5 --type md --type txt 2>/dev/null || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

# Let's check if there are any comments or documentation about the Llama Stack types
# Look for any imports or references that might explain the semantics
rg "from llama_stack" src/ -B 2 -A 10 | head -80

Repository: lightspeed-core/lightspeed-stack

Length of output: 3943


Keep output_index/content_index consistent for a single output item.

The stream currently emits output_index and content_index as 0 → 1 → 2 even though only one output message is produced. Since response_obj.output contains exactly one element, all events should reference output_index=0 and content_index=0. Clients expecting these indices to map to array positions in the response will encounter semantic inconsistencies.

🔧 Suggested fix
-    yield OpenAIResponseObjectStreamResponseOutputTextDelta(
-        content_index=1,
-        delta=message,
-        item_id="msg_shield_violation_2",
-        output_index=1,
-        sequence_number=1,
-    )
+    yield OpenAIResponseObjectStreamResponseOutputTextDelta(
+        content_index=0,
+        delta=message,
+        item_id="msg_shield_violation_1",
+        output_index=0,
+        sequence_number=1,
+    )
@@
-    yield OpenAIResponseObjectStreamResponseOutputTextDone(
-        content_index=2,
-        text=message,
-        item_id="msg_shield_violation_3",
-        output_index=2,
-        sequence_number=2,
-    )
+    yield OpenAIResponseObjectStreamResponseOutputTextDone(
+        content_index=0,
+        text=message,
+        item_id="msg_shield_violation_1",
+        output_index=0,
+        sequence_number=2,
+    )
🤖 Prompt for AI Agents
In `@src/utils/query.py` around lines 60 - 118, create_violation_stream emits
events with content_index/output_index 0→1→2 while response_obj.output has a
single element; update the emitted events so all references point to the single
output at index 0. Specifically, in create_violation_stream change the
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseOutputTextDelta, and
OpenAIResponseObjectStreamResponseOutputTextDone yields to use content_index=0
and output_index=0 (keep their item_id/sequence_number values as needed), and
ensure the final response_obj.output remains the single element referenced by
those indices.

Copy link
Contributor

@tisnik tisnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting change, LGTM

@tisnik tisnik merged commit ab07691 into lightspeed-core:main Jan 16, 2026
21 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants