-
Notifications
You must be signed in to change notification settings - Fork 572
feat(integrations): openai-agents streaming support #5291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e5ad308
8884375
c446c35
ca2b9ba
e5d0ffa
5e0d558
5f73e4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| from .models import _create_get_model_wrapper # noqa: F401 | ||
| from .tools import _create_get_all_tools_wrapper # noqa: F401 | ||
| from .runner import _create_run_wrapper # noqa: F401 | ||
| from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 | ||
| from .agent_run import _patch_agent_run # noqa: F401 | ||
| from .error_tracing import _patch_error_tracing # noqa: F401 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,15 +9,24 @@ | |
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Any, Callable | ||
|
|
||
| from typing import Any, Callable, Optional | ||
|
|
||
| try: | ||
| import agents | ||
| except ImportError: | ||
| raise DidNotEnable("OpenAI Agents not installed") | ||
|
|
||
|
|
||
| def _set_response_model_on_agent_span( | ||
| agent: "agents.Agent", response_model: "Optional[str]" | ||
| ) -> None: | ||
| """Set the response model on the agent's invoke_agent span if available.""" | ||
| if response_model: | ||
| agent_span = getattr(agent, "_sentry_agent_span", None) | ||
| if agent_span: | ||
| agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) | ||
|
|
||
|
|
||
| def _create_get_model_wrapper( | ||
| original_get_model: "Callable[..., Any]", | ||
| ) -> "Callable[..., Any]": | ||
|
|
@@ -37,15 +46,19 @@ def wrapped_get_model( | |
| # because we only patch its direct methods, all underlying data can remain unchanged. | ||
| model = copy.copy(original_get_model(agent, run_config)) | ||
|
|
||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model | ||
| # Capture the request model name for spans (agent.model can be None when using defaults) | ||
| request_model_name = model.model if hasattr(model, "model") else str(model) | ||
| agent._sentry_request_model = request_model_name | ||
|
|
||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture response model | ||
| if hasattr(model, "_fetch_response"): | ||
| original_fetch_response = model._fetch_response | ||
|
|
||
| @wraps(original_fetch_response) | ||
| async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": | ||
| response = await original_fetch_response(*args, **kwargs) | ||
| if hasattr(response, "model"): | ||
| agent._sentry_raw_response_model = str(response.model) | ||
| if hasattr(response, "model") and response.model: | ||
| agent._sentry_response_model = str(response.model) | ||
| return response | ||
|
|
||
| model._fetch_response = wrapped_fetch_response | ||
|
|
@@ -57,22 +70,74 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": | |
| with ai_client_span(agent, kwargs) as span: | ||
| result = await original_get_response(*args, **kwargs) | ||
|
|
||
| response_model = getattr(agent, "_sentry_raw_response_model", None) | ||
| # Get response model captured from _fetch_response and clean up | ||
| response_model = getattr(agent, "_sentry_response_model", None) | ||
| if response_model: | ||
| agent_span = getattr(agent, "_sentry_agent_span", None) | ||
| if agent_span: | ||
| agent_span.set_data( | ||
| SPANDATA.GEN_AI_RESPONSE_MODEL, response_model | ||
| ) | ||
| delattr(agent, "_sentry_response_model") | ||
|
|
||
| delattr(agent, "_sentry_raw_response_model") | ||
|
|
||
| update_ai_client_span(span, agent, kwargs, result, response_model) | ||
| _set_response_model_on_agent_span(agent, response_model) | ||
| update_ai_client_span(span, result, response_model) | ||
|
|
||
| return result | ||
|
|
||
| model.get_response = wrapped_get_response | ||
|
|
||
| # Also wrap stream_response for streaming support | ||
| if hasattr(model, "stream_response"): | ||
| original_stream_response = model.stream_response | ||
|
|
||
| @wraps(original_stream_response) | ||
| async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": | ||
| """ | ||
| Wrap stream_response to create an AI client span for streaming. | ||
| stream_response is an async generator, so we yield events within the span. | ||
| Note: stream_response is called with positional args unlike get_response | ||
| which uses keyword args. The signature is: | ||
| stream_response( | ||
| system_instructions, # args[0] | ||
| input, # args[1] | ||
| model_settings, # args[2] | ||
| tools, # args[3] | ||
| output_schema, # args[4] | ||
| handoffs, # args[5] | ||
| tracing, # args[6] | ||
| *, | ||
| previous_response_id, | ||
| conversation_id, | ||
| prompt, | ||
| ) | ||
| """ | ||
| # Build kwargs dict from positional args for span data capture | ||
| span_kwargs = dict(kwargs) | ||
| if len(args) > 0: | ||
| span_kwargs["system_instructions"] = args[0] | ||
| if len(args) > 1: | ||
| span_kwargs["input"] = args[1] | ||
|
|
||
| with ai_client_span(agent, span_kwargs) as span: | ||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) | ||
|
|
||
| streaming_response = None | ||
| async for event in original_stream_response(*args, **kwargs): | ||
| # Capture the full response from ResponseCompletedEvent | ||
| if hasattr(event, "response"): | ||
| streaming_response = event.response | ||
| yield event | ||
|
Comment on lines
+118
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: A synchronous 🔍 Detailed AnalysisThe function 💡 Suggested FixUse 🤖 Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. |
||
|
|
||
| # Update span with response data (usage, output, model) | ||
| if streaming_response: | ||
| response_model = ( | ||
| str(streaming_response.model) | ||
| if hasattr(streaming_response, "model") | ||
| and streaming_response.model | ||
| else None | ||
| ) | ||
| _set_response_model_on_agent_span(agent, response_model) | ||
| update_ai_client_span(span, streaming_response) | ||
|
|
||
| model.stream_response = wrapped_stream_response | ||
|
|
||
| return model | ||
|
|
||
| return wrapped_get_model | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workflow span ignores exception info when streaming fails
Medium Severity
When streaming execution fails during
execute_final_output, the workflow span is closed withworkflow_span.__exit__(None, None, None)instead ofworkflow_span.__exit__(*sys.exc_info()). Since this code is in afinallyblock, if an exception was raised in thetryblock,sys.exc_info()would contain the exception info. By passing(None, None, None), the workflow span won't properly indicate that an error occurred. This is inconsistent with the error handling in_create_run_streamed_wrapper(runner.py line 104), which correctly passes*sys.exc_info()when an exception occurs.