From e5ad308c22092ff199d5b97ec267988cafe9152b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Mon, 12 Jan 2026 08:47:10 +0100 Subject: [PATCH 1/9] feat(integrations): openai-agents streaming support --- sentry_sdk/integrations/mcp.py | 29 +++++++- .../integrations/openai_agents/__init__.py | 9 ++- .../openai_agents/patches/__init__.py | 2 +- .../openai_agents/patches/agent_run.py | 41 +++++++++++ .../openai_agents/patches/runner.py | 72 +++++++++++++++++++ .../openai_agents/test_openai_agents.py | 21 ++++++ 6 files changed, 169 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/mcp.py b/sentry_sdk/integrations/mcp.py index 47fda272b7..a8dc78f18e 100644 --- a/sentry_sdk/integrations/mcp.py +++ b/sentry_sdk/integrations/mcp.py @@ -7,6 +7,7 @@ Supports the low-level `mcp.server.lowlevel.Server` API. """ +from contextlib import contextmanager import inspect from functools import wraps from typing import TYPE_CHECKING @@ -352,6 +353,30 @@ def _prepare_handler_data( ) +@contextmanager +def ensure_span(*args, **kwargs): + """Ensure a span is created for the current context.""" + + current_span = sentry_sdk.get_current_span() + transaction_exists = ( + current_span is not None and current_span.containing_transaction is not None + ) + + if transaction_exists: + with sentry_sdk.start_span(*args, **kwargs) as span: + yield span + else: + with sentry_sdk.start_transaction(*args, **kwargs): + with sentry_sdk.start_span(*args, **kwargs) as span: + yield span + # with get_start_span_function()( + # op=OP.MCP_SERVER, + # name=span_name, + # origin=MCPIntegration.origin, + # ) as span: + # yield span + + async def _async_handler_wrapper( handler_type: str, func: "Callable[..., Any]", @@ -382,7 +407,7 @@ async def _async_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args, original_kwargs) # Start span and execute - with get_start_span_function()( + with ensure_span( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, @@ -454,7 +479,7 @@ def _sync_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args) # Start span and execute - with get_start_span_function()( + with ensure_span( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, diff --git a/sentry_sdk/integrations/openai_agents/__init__.py b/sentry_sdk/integrations/openai_agents/__init__.py index b121557fbd..deb136de01 100644 --- a/sentry_sdk/integrations/openai_agents/__init__.py +++ b/sentry_sdk/integrations/openai_agents/__init__.py @@ -4,6 +4,7 @@ _create_get_model_wrapper, _create_get_all_tools_wrapper, _create_run_wrapper, + _create_run_streamed_wrapper, _patch_agent_run, _patch_error_tracing, ) @@ -25,12 +26,16 @@ def _patch_runner() -> None: # Create the root span for one full agent run (including eventual handoffs) # Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around # agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately. - # TODO-anton: Also patch streaming runner: agents.Runner.run_streamed agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper( agents.run.DEFAULT_AGENT_RUNNER.run ) - # Creating the actual spans for each agent run. + # Patch streaming runner + agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper( + agents.run.DEFAULT_AGENT_RUNNER.run_streamed + ) + + # Creating the actual spans for each agent run (works for both streaming and non-streaming). _patch_agent_run() diff --git a/sentry_sdk/integrations/openai_agents/patches/__init__.py b/sentry_sdk/integrations/openai_agents/patches/__init__.py index 33058f01a1..b53ca79e19 100644 --- a/sentry_sdk/integrations/openai_agents/patches/__init__.py +++ b/sentry_sdk/integrations/openai_agents/patches/__init__.py @@ -1,5 +1,5 @@ from .models import _create_get_model_wrapper # noqa: F401 from .tools import _create_get_all_tools_wrapper # noqa: F401 -from .runner import _create_run_wrapper # noqa: F401 +from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 from .agent_run import _patch_agent_run # noqa: F401 from .error_tracing import _patch_error_tracing # noqa: F401 diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 29649af945..5b29f2ddaa 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -31,6 +31,7 @@ def _patch_agent_run() -> None: # Store original methods original_run_single_turn = agents.run.AgentRunner._run_single_turn + original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs original_execute_final_output = agents._run_impl.RunImpl.execute_final_output @@ -149,8 +150,48 @@ async def patched_execute_final_output( return result + @wraps( + original_run_single_turn_streamed.__func__ + if hasattr(original_run_single_turn_streamed, "__func__") + else original_run_single_turn_streamed + ) + async def patched_run_single_turn_streamed( + cls: "agents.Runner", *args: "Any", **kwargs: "Any" + ) -> "Any": + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming""" + agent = kwargs.get("agent") + context_wrapper = kwargs.get("context_wrapper") + should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + + span = getattr(context_wrapper, "_sentry_agent_span", None) + # Start agent span when agent starts (but only once per agent) + if should_run_agent_start_hooks and agent and context_wrapper: + # End any existing span for a different agent + if _has_active_agent_span(context_wrapper): + current_agent = _get_current_agent(context_wrapper) + if current_agent and current_agent != agent: + end_invoke_agent_span(context_wrapper, current_agent) + + span = _start_invoke_agent_span(context_wrapper, agent, kwargs) + agent._sentry_agent_span = span + + # Call original streaming method + try: + result = await original_run_single_turn_streamed(*args, **kwargs) + except Exception as exc: + if span is not None and span.timestamp is None: + _record_exception_on_span(span, exc) + end_invoke_agent_span(context_wrapper, agent) + + reraise(*sys.exc_info()) + + return result + # Apply patches agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) + agents.run.AgentRunner._run_single_turn_streamed = classmethod( + patched_run_single_turn_streamed + ) agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) agents._run_impl.RunImpl.execute_final_output = classmethod( patched_execute_final_output diff --git a/sentry_sdk/integrations/openai_agents/patches/runner.py b/sentry_sdk/integrations/openai_agents/patches/runner.py index 1d3bbc894b..656f509c96 100644 --- a/sentry_sdk/integrations/openai_agents/patches/runner.py +++ b/sentry_sdk/integrations/openai_agents/patches/runner.py @@ -1,3 +1,4 @@ +import sys from functools import wraps import sentry_sdk @@ -64,3 +65,74 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any": return run_result return wrapper + + +def _create_run_streamed_wrapper( + original_func: "Callable[..., Any]", +) -> "Callable[..., Any]": + """ + Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs. + + Unlike run(), run_streamed() returns immediately with a RunResultStreaming object + while execution continues in a background task. The workflow span must stay open + throughout the streaming operation and close when streaming completes or is abandoned. + """ + + @wraps(original_func) + def wrapper(*args: "Any", **kwargs: "Any") -> "Any": + # Isolate each workflow so that when agents are run in asyncio tasks they + # don't touch each other's scopes + isolation_scope = sentry_sdk.isolation_scope() + isolation_scope.__enter__() + + # Clone agent because agent invocation spans are attached per run. + agent = args[0].clone() + + # Start workflow span immediately (before run_streamed returns) + workflow_span = agent_workflow_span(agent) + workflow_span.__enter__() + + # Store span and scope on agent for cleanup + agent._sentry_workflow_span = workflow_span + agent._sentry_isolation_scope = isolation_scope + + args = (agent, *args[1:]) + + try: + # Call original function to get RunResultStreaming + run_result = original_func(*args, **kwargs) + except Exception as exc: + # If run_streamed itself fails (not the background task), clean up immediately + workflow_span.__exit__(*sys.exc_info()) + isolation_scope.__exit__(None, None, None) + _capture_exception(exc) + raise exc from None + + # Wrap the result to ensure cleanup when streaming completes + original_aclose = getattr(run_result, "aclose", None) + + async def wrapped_aclose() -> None: + """Close streaming result and clean up Sentry spans""" + try: + if original_aclose is not None: + await original_aclose() + finally: + # End any remaining agent span + if hasattr(run_result, "context_wrapper"): + end_invoke_agent_span(run_result.context_wrapper, agent) + + # End workflow span + if hasattr(agent, "_sentry_workflow_span"): + workflow_span.__exit__(None, None, None) + delattr(agent, "_sentry_workflow_span") + + # Exit isolation scope + if hasattr(agent, "_sentry_isolation_scope"): + isolation_scope.__exit__(None, None, None) + delattr(agent, "_sentry_isolation_scope") + + run_result.aclose = wrapped_aclose + + return run_result + + return wrapper diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index c5cb25dfee..85e71b3da1 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -1998,3 +1998,24 @@ def test_openai_agents_message_truncation(sentry_init, capture_events): assert len(parsed_messages) == 2 assert "small message 4" in str(parsed_messages[0]) assert "small message 5" in str(parsed_messages[1]) + + +def test_streaming_patches_applied(sentry_init): + """ + Test that the streaming patches are applied correctly. + """ + sentry_init( + integrations=[OpenAIAgentsIntegration()], + traces_sample_rate=1.0, + ) + + # Verify that run_streamed is patched (will have __wrapped__ attribute if patched) + import agents + + # Check that the method exists and has been modified + assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed") + assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed") + + # Verify the patches were applied by checking for our wrapper + run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed + assert run_streamed_func is not None From 8884375874931937a090c5cfa64b80801198e88b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Mon, 12 Jan 2026 13:18:04 +0100 Subject: [PATCH 2/9] fix: improved instrumentation --- .../openai_agents/patches/agent_run.py | 43 ++++++++++++++-- .../openai_agents/patches/models.py | 51 +++++++++++++++++++ .../openai_agents/patches/runner.py | 40 +++------------ 3 files changed, 97 insertions(+), 37 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 5b29f2ddaa..0241ac784f 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -1,6 +1,7 @@ import sys from functools import wraps +from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.utils import reraise from ..spans import ( @@ -148,6 +149,13 @@ async def patched_execute_final_output( if agent and context_wrapper and _has_active_agent_span(context_wrapper): end_invoke_agent_span(context_wrapper, agent, final_output) + # For streaming: close the workflow span if it exists + # (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper) + if agent and hasattr(agent, "_sentry_workflow_span"): + workflow_span = agent._sentry_workflow_span + workflow_span.__exit__(None, None, None) + delattr(agent, "_sentry_workflow_span") + return result @wraps( @@ -158,10 +166,28 @@ async def patched_execute_final_output( async def patched_run_single_turn_streamed( cls: "agents.Runner", *args: "Any", **kwargs: "Any" ) -> "Any": - """Patched _run_single_turn_streamed that creates agent invocation spans for streaming""" - agent = kwargs.get("agent") - context_wrapper = kwargs.get("context_wrapper") - should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. + + Note: Unlike _run_single_turn which uses keyword-only arguments (*,), + _run_single_turn_streamed uses positional arguments. The call signature is: + _run_single_turn_streamed( + streamed_result, # args[0] + agent, # args[1] + hooks, # args[2] + context_wrapper, # args[3] + run_config, # args[4] + should_run_agent_start_hooks, # args[5] + tool_use_tracker, # args[6] + all_tools, # args[7] + server_conversation_tracker, # args[8] (optional) + ) + """ + # Extract positional arguments (streaming version doesn't use keyword-only args) + agent = args[1] if len(args) > 1 else kwargs.get("agent") + context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") + should_run_agent_start_hooks = ( + args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks") + ) span = getattr(context_wrapper, "_sentry_agent_span", None) # Start agent span when agent starts (but only once per agent) @@ -172,7 +198,14 @@ async def patched_run_single_turn_streamed( if current_agent and current_agent != agent: end_invoke_agent_span(context_wrapper, current_agent) - span = _start_invoke_agent_span(context_wrapper, agent, kwargs) + # Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span) + span_kwargs = { + "agent": agent, + "context_wrapper": context_wrapper, + "should_run_agent_start_hooks": should_run_agent_start_hooks, + } + span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs) + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) agent._sentry_agent_span = span # Call original streaming method diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index a9b3c16a22..8099cfef89 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -73,6 +73,57 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": model.get_response = wrapped_get_response + # Also wrap stream_response for streaming support + if hasattr(model, "stream_response"): + original_stream_response = model.stream_response + + @wraps(original_stream_response) + async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": + """ + Wrap stream_response to create an AI client span for streaming. + stream_response is an async generator, so we yield events within the span. + + Note: stream_response is called with positional args unlike get_response + which uses keyword args. The signature is: + stream_response( + system_instructions, # args[0] + input, # args[1] + model_settings, # args[2] + tools, # args[3] + output_schema, # args[4] + handoffs, # args[5] + tracing, # args[6] + *, + previous_response_id, + conversation_id, + prompt, + ) + """ + # Build kwargs dict from positional args for span data capture + span_kwargs = dict(kwargs) + if len(args) > 0: + span_kwargs["system_instructions"] = args[0] + if len(args) > 1: + span_kwargs["input"] = args[1] + + with ai_client_span(agent, span_kwargs) as span: + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + async for event in original_stream_response(*args, **kwargs): + yield event + + # Get response model if captured + response_model = getattr(agent, "_sentry_raw_response_model", None) + if response_model: + agent_span = getattr(agent, "_sentry_agent_span", None) + if agent_span: + agent_span.set_data( + SPANDATA.GEN_AI_RESPONSE_MODEL, response_model + ) + delattr(agent, "_sentry_raw_response_model") + + model.stream_response = wrapped_stream_response + return model return wrapped_get_model diff --git a/sentry_sdk/integrations/openai_agents/patches/runner.py b/sentry_sdk/integrations/openai_agents/patches/runner.py index 656f509c96..7fa8d1c2a2 100644 --- a/sentry_sdk/integrations/openai_agents/patches/runner.py +++ b/sentry_sdk/integrations/openai_agents/patches/runner.py @@ -76,15 +76,14 @@ def _create_run_streamed_wrapper( Unlike run(), run_streamed() returns immediately with a RunResultStreaming object while execution continues in a background task. The workflow span must stay open throughout the streaming operation and close when streaming completes or is abandoned. + + Note: We don't use isolation_scope() here because it uses context variables that + cannot span async boundaries (the __enter__ and __exit__ would be called from + different async contexts, causing ValueError). """ @wraps(original_func) def wrapper(*args: "Any", **kwargs: "Any") -> "Any": - # Isolate each workflow so that when agents are run in asyncio tasks they - # don't touch each other's scopes - isolation_scope = sentry_sdk.isolation_scope() - isolation_scope.__enter__() - # Clone agent because agent invocation spans are attached per run. agent = args[0].clone() @@ -92,9 +91,8 @@ def wrapper(*args: "Any", **kwargs: "Any") -> "Any": workflow_span = agent_workflow_span(agent) workflow_span.__enter__() - # Store span and scope on agent for cleanup + # Store span on agent for cleanup agent._sentry_workflow_span = workflow_span - agent._sentry_isolation_scope = isolation_scope args = (agent, *args[1:]) @@ -104,34 +102,12 @@ def wrapper(*args: "Any", **kwargs: "Any") -> "Any": except Exception as exc: # If run_streamed itself fails (not the background task), clean up immediately workflow_span.__exit__(*sys.exc_info()) - isolation_scope.__exit__(None, None, None) _capture_exception(exc) raise exc from None - # Wrap the result to ensure cleanup when streaming completes - original_aclose = getattr(run_result, "aclose", None) - - async def wrapped_aclose() -> None: - """Close streaming result and clean up Sentry spans""" - try: - if original_aclose is not None: - await original_aclose() - finally: - # End any remaining agent span - if hasattr(run_result, "context_wrapper"): - end_invoke_agent_span(run_result.context_wrapper, agent) - - # End workflow span - if hasattr(agent, "_sentry_workflow_span"): - workflow_span.__exit__(None, None, None) - delattr(agent, "_sentry_workflow_span") - - # Exit isolation scope - if hasattr(agent, "_sentry_isolation_scope"): - isolation_scope.__exit__(None, None, None) - delattr(agent, "_sentry_isolation_scope") - - run_result.aclose = wrapped_aclose + # Store references for cleanup + run_result._sentry_workflow_span = workflow_span + run_result._sentry_agent = agent return run_result From c446c355cdda62dc72dcf97949c29f81128eee38 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 11:06:41 +0100 Subject: [PATCH 3/9] fix: improving span data support for streaming responses --- .../openai_agents/patches/agent_run.py | 5 + .../openai_agents/patches/models.py | 51 ++++-- .../openai_agents/spans/__init__.py | 6 +- .../openai_agents/spans/ai_client.py | 36 +++- .../integrations/openai_agents/utils.py | 6 + .../openai_agents/test_openai_agents.py | 164 +++++++++++------- 6 files changed, 192 insertions(+), 76 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 0241ac784f..bd6f2e05d7 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -183,6 +183,7 @@ async def patched_run_single_turn_streamed( ) """ # Extract positional arguments (streaming version doesn't use keyword-only args) + streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") agent = args[1] if len(args) > 1 else kwargs.get("agent") context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") should_run_agent_start_hooks = ( @@ -199,11 +200,15 @@ async def patched_run_single_turn_streamed( end_invoke_agent_span(context_wrapper, current_agent) # Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span) + # Include original_input from streamed_result for request messages span_kwargs = { "agent": agent, "context_wrapper": context_wrapper, "should_run_agent_start_hooks": should_run_agent_start_hooks, } + if streamed_result and hasattr(streamed_result, "input"): + span_kwargs["original_input"] = streamed_result.input + span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs) span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) agent._sentry_agent_span = span diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index 8099cfef89..bf369266b5 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -3,7 +3,11 @@ from sentry_sdk.integrations import DidNotEnable -from ..spans import ai_client_span, update_ai_client_span +from ..spans import ( + ai_client_span, + update_ai_client_span, + update_ai_client_span_streaming, +) from sentry_sdk.consts import SPANDATA from typing import TYPE_CHECKING @@ -37,15 +41,19 @@ def wrapped_get_model( # because we only patch its direct methods, all underlying data can remain unchanged. model = copy.copy(original_get_model(agent, run_config)) - # Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model + # Capture the request model name for spans (agent.model can be None when using defaults) + request_model_name = model.model if hasattr(model, "model") else str(model) + agent._sentry_request_model = request_model_name + + # Wrap _fetch_response if it exists (for OpenAI models) to capture response model if hasattr(model, "_fetch_response"): original_fetch_response = model._fetch_response @wraps(original_fetch_response) async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": response = await original_fetch_response(*args, **kwargs) - if hasattr(response, "model"): - agent._sentry_raw_response_model = str(response.model) + if hasattr(response, "model") and response.model: + agent._sentry_response_model = str(response.model) return response model._fetch_response = wrapped_fetch_response @@ -57,15 +65,17 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": with ai_client_span(agent, kwargs) as span: result = await original_get_response(*args, **kwargs) - response_model = getattr(agent, "_sentry_raw_response_model", None) + # Get response model captured from _fetch_response + response_model = getattr(agent, "_sentry_response_model", None) if response_model: + # Set response model on agent span agent_span = getattr(agent, "_sentry_agent_span", None) if agent_span: agent_span.set_data( SPANDATA.GEN_AI_RESPONSE_MODEL, response_model ) - - delattr(agent, "_sentry_raw_response_model") + # Clean up after use + delattr(agent, "_sentry_response_model") update_ai_client_span(span, agent, kwargs, result, response_model) @@ -109,18 +119,27 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": with ai_client_span(agent, span_kwargs) as span: span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + streaming_response = None async for event in original_stream_response(*args, **kwargs): + # Capture the full response from ResponseCompletedEvent + if hasattr(event, "response"): + streaming_response = event.response yield event - # Get response model if captured - response_model = getattr(agent, "_sentry_raw_response_model", None) - if response_model: - agent_span = getattr(agent, "_sentry_agent_span", None) - if agent_span: - agent_span.set_data( - SPANDATA.GEN_AI_RESPONSE_MODEL, response_model - ) - delattr(agent, "_sentry_raw_response_model") + # Update span with response data (usage, output, model) + if streaming_response: + update_ai_client_span_streaming(span, agent, streaming_response) + # Also set response model on agent span + if ( + hasattr(streaming_response, "model") + and streaming_response.model + ): + agent_span = getattr(agent, "_sentry_agent_span", None) + if agent_span: + agent_span.set_data( + SPANDATA.GEN_AI_RESPONSE_MODEL, + str(streaming_response.model), + ) model.stream_response = wrapped_stream_response diff --git a/sentry_sdk/integrations/openai_agents/spans/__init__.py b/sentry_sdk/integrations/openai_agents/spans/__init__.py index 64b979fc25..af9c2c4e6e 100644 --- a/sentry_sdk/integrations/openai_agents/spans/__init__.py +++ b/sentry_sdk/integrations/openai_agents/spans/__init__.py @@ -1,5 +1,9 @@ from .agent_workflow import agent_workflow_span # noqa: F401 -from .ai_client import ai_client_span, update_ai_client_span # noqa: F401 +from .ai_client import ( + ai_client_span, + update_ai_client_span, + update_ai_client_span_streaming, +) # noqa: F401 from .execute_tool import execute_tool_span, update_execute_tool_span # noqa: F401 from .handoff import handoff_span # noqa: F401 from .invoke_agent import ( diff --git a/sentry_sdk/integrations/openai_agents/spans/ai_client.py b/sentry_sdk/integrations/openai_agents/spans/ai_client.py index 1e188aa097..16c0c441d0 100644 --- a/sentry_sdk/integrations/openai_agents/spans/ai_client.py +++ b/sentry_sdk/integrations/openai_agents/spans/ai_client.py @@ -21,7 +21,13 @@ def ai_client_span( agent: "Agent", get_response_kwargs: "dict[str, Any]" ) -> "sentry_sdk.tracing.Span": # TODO-anton: implement other types of operations. Now "chat" is hardcoded. - model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + # Get model name from agent.model or fall back to request model (for when agent.model is None/default) + model_name = None + if agent.model: + model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + elif hasattr(agent, "_sentry_request_model"): + model_name = agent._sentry_request_model + span = sentry_sdk.start_span( op=OP.GEN_AI_CHAT, description=f"chat {model_name}", @@ -50,3 +56,31 @@ def update_ai_client_span( # Set response model if captured from raw response if response_model is not None: span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + + +def update_ai_client_span_streaming( + span: "sentry_sdk.tracing.Span", + agent: "Agent", + response: "Any", +) -> None: + """ + Update AI client span with data from a streaming response. + The streaming response has a different structure than the non-streaming response: + - response.usage contains usage data + - response.output contains output items (similar to result.output) + - response.model contains the response model + """ + if hasattr(response, "usage") and response.usage: + _set_usage_data(span, response.usage) + + # For streaming, set output data from the response + if hasattr(response, "output"): + _set_output_data(span, response) + + # Create MCP tool spans if applicable + if hasattr(response, "output"): + _create_mcp_execute_tool_spans(span, response) + + # Set response model + if hasattr(response, "model") and response.model: + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, str(response.model)) diff --git a/sentry_sdk/integrations/openai_agents/utils.py b/sentry_sdk/integrations/openai_agents/utils.py index a24d0e909d..a77d9f46ea 100644 --- a/sentry_sdk/integrations/openai_agents/utils.py +++ b/sentry_sdk/integrations/openai_agents/utils.py @@ -63,8 +63,14 @@ def _set_agent_data(span: "sentry_sdk.tracing.Span", agent: "agents.Agent") -> N SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, agent.model_settings.max_tokens ) + # Get model name from agent.model or fall back to request model (for when agent.model is None/default) + model_name = None if agent.model: model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + elif hasattr(agent, "_sentry_request_model"): + model_name = agent._sentry_request_model + + if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) if agent.model_settings.presence_penalty: diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index 85e71b3da1..2bf41ceba9 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -1465,7 +1465,7 @@ async def test_ai_client_span_includes_response_model( ): """ Test that ai_client spans (gen_ai.chat) include the response model from the actual API response. - This verifies the new functionality to capture the model used in the response. + This verifies we capture the actual model used (which may differ from the requested model). """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): @@ -1473,7 +1473,7 @@ async def test_ai_client_span_includes_response_model( with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock OpenAI Response object with a model field + # Create a mock OpenAI Response object with a specific model version mock_response = MagicMock() mock_response.model = "gpt-4.1-2025-04-14" # The actual response model mock_response.id = "resp_123" @@ -1523,7 +1523,7 @@ async def test_ai_client_span_includes_response_model( spans = transaction["spans"] _, ai_client_span = spans - # Verify ai_client span has response model + # Verify ai_client span has response model from API response assert ai_client_span["description"] == "chat gpt-4" assert "gen_ai.response.model" in ai_client_span["data"] assert ai_client_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -1545,13 +1545,13 @@ async def test_ai_client_span_response_model_with_chat_completions( ) with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): - # Mock the get_response method directly since ChatCompletions may use Responses API anyway + # Mock the _fetch_response method with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock Response object with a model field + # Create a mock Response object mock_response = MagicMock() - mock_response.model = "gpt-4o-mini-2024-07-18" # Actual response model + mock_response.model = "gpt-4o-mini-2024-07-18" mock_response.id = "resp_123" mock_response.output = [ ResponseOutputMessage( @@ -1598,7 +1598,7 @@ async def test_ai_client_span_response_model_with_chat_completions( spans = transaction["spans"] _, ai_client_span = spans - # Verify response model from Response is captured + # Verify response model from API response is captured assert "gen_ai.response.model" in ai_client_span["data"] assert ai_client_span["data"]["gen_ai.response.model"] == "gpt-4o-mini-2024-07-18" @@ -1711,41 +1711,45 @@ async def test_response_model_not_set_when_unavailable( sentry_init, capture_events, test_agent ): """ - Test that response model is not set if the raw response doesn't have a model field. - This can happen with custom model implementations. + Test that response model is not set if the API response doesn't have a model field. + The request model should still be set correctly. """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): - # Mock without _fetch_response (simulating custom model without this method) with patch( - "agents.models.openai_responses.OpenAIResponsesModel.get_response" - ) as mock_get_response: - response = ModelResponse( - output=[ - ResponseOutputMessage( - id="msg_123", - type="message", - status="completed", - content=[ - ResponseOutputText( - text="Response without model field", - type="output_text", - annotations=[], - ) - ], - role="assistant", - ) - ], - usage=Usage( - requests=1, - input_tokens=10, - output_tokens=20, - total_tokens=30, - ), - response_id="resp_123", + "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" + ) as mock_fetch_response: + # Create a mock response without a model field + mock_response = MagicMock() + mock_response.model = None # No model in response + mock_response.id = "resp_123" + mock_response.output = [ + ResponseOutputMessage( + id="msg_123", + type="message", + status="completed", + content=[ + ResponseOutputText( + text="Response without model field", + type="output_text", + annotations=[], + ) + ], + role="assistant", + ) + ] + mock_response.usage = MagicMock() + mock_response.usage.input_tokens = 10 + mock_response.usage.output_tokens = 20 + mock_response.usage.total_tokens = 30 + mock_response.usage.input_tokens_details = InputTokensDetails( + cached_tokens=0 ) - # Don't set _sentry_response_model attribute - mock_get_response.return_value = response + mock_response.usage.output_tokens_details = OutputTokensDetails( + reasoning_tokens=0 + ) + + mock_fetch_response.return_value = mock_response sentry_init( integrations=[OpenAIAgentsIntegration()], @@ -1754,25 +1758,21 @@ async def test_response_model_not_set_when_unavailable( events = capture_events() - # Remove the _fetch_response method to simulate custom model - with patch.object( - agents.models.openai_responses.OpenAIResponsesModel, - "_fetch_response", - None, - ): - result = await agents.Runner.run( - test_agent, "Test input", run_config=test_run_config - ) + result = await agents.Runner.run( + test_agent, "Test input", run_config=test_run_config + ) - assert result is not None + assert result is not None (transaction,) = events spans = transaction["spans"] _, ai_client_span = spans - # When response model can't be captured, it shouldn't be in the span data - # (we only set it when we can accurately capture it) + # Response model should NOT be set when API doesn't return it assert "gen_ai.response.model" not in ai_client_span["data"] + # But request model should still be set + assert "gen_ai.request.model" in ai_client_span["data"] + assert ai_client_span["data"]["gen_ai.request.model"] == "gpt-4" @pytest.mark.asyncio @@ -1780,15 +1780,14 @@ async def test_invoke_agent_span_includes_response_model( sentry_init, capture_events, test_agent ): """ - Test that invoke_agent spans include the response model. - When an agent makes multiple LLM calls, it should report the last model used. + Test that invoke_agent spans include the response model from the API response. """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock OpenAI Response object with a model field + # Create a mock OpenAI Response object with a specific model version mock_response = MagicMock() mock_response.model = "gpt-4.1-2025-04-14" # The actual response model mock_response.id = "resp_123" @@ -1838,7 +1837,7 @@ async def test_invoke_agent_span_includes_response_model( spans = transaction["spans"] invoke_agent_span, ai_client_span = spans - # Verify invoke_agent span has response model + # Verify invoke_agent span has response model from API assert invoke_agent_span["description"] == "invoke_agent test_agent" assert "gen_ai.response.model" in invoke_agent_span["data"] assert invoke_agent_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -1868,7 +1867,7 @@ def calculator(a: int, b: int) -> int: with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # First call: gpt-4 model + # First call: gpt-4 model returns tool call first_response = MagicMock() first_response.model = "gpt-4-0613" first_response.id = "resp_1" @@ -1892,9 +1891,9 @@ def calculator(a: int, b: int) -> int: reasoning_tokens=0 ) - # Second call: different model (e.g., after tool execution) + # Second call: different model version returns final message second_response = MagicMock() - second_response.model = "gpt-4.1-2025-04-14" # Different model + second_response.model = "gpt-4.1-2025-04-14" second_response.id = "resp_2" second_response.output = [ ResponseOutputMessage( @@ -1946,11 +1945,11 @@ def calculator(a: int, b: int) -> int: first_ai_client_span = spans[1] second_ai_client_span = spans[3] # After tool span - # Verify invoke_agent span uses the LAST response model + # Invoke_agent span uses the LAST response model assert "gen_ai.response.model" in invoke_agent_span["data"] assert invoke_agent_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" - # Verify each ai_client span has its own response model + # Each ai_client span has its own response model from the API assert first_ai_client_span["data"]["gen_ai.response.model"] == "gpt-4-0613" assert ( second_ai_client_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -2019,3 +2018,52 @@ def test_streaming_patches_applied(sentry_init): # Verify the patches were applied by checking for our wrapper run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed assert run_streamed_func is not None + + +@pytest.mark.asyncio +async def test_streaming_span_update_captures_response_data( + sentry_init, test_agent, mock_usage +): + """ + Test that update_ai_client_span_streaming correctly captures response text, + usage data, and response model from a streaming response. + """ + from sentry_sdk.integrations.openai_agents.spans.ai_client import ( + update_ai_client_span_streaming, + ) + + sentry_init( + integrations=[OpenAIAgentsIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) + + # Create a mock streaming response object (similar to what we'd get from ResponseCompletedEvent) + mock_streaming_response = MagicMock() + mock_streaming_response.model = "gpt-4-streaming" + mock_streaming_response.usage = mock_usage + mock_streaming_response.output = [ + ResponseOutputMessage( + id="msg_streaming_123", + type="message", + status="completed", + content=[ + ResponseOutputText( + text="Hello from streaming!", + type="output_text", + annotations=[], + ) + ], + role="assistant", + ) + ] + + # Test the update function directly + with start_span(op="gen_ai.chat", description="test chat") as span: + update_ai_client_span_streaming(span, test_agent, mock_streaming_response) + + # Verify the span data was set correctly + assert span._data["gen_ai.response.text"] == "Hello from streaming!" + assert span._data["gen_ai.usage.input_tokens"] == 10 + assert span._data["gen_ai.usage.output_tokens"] == 20 + assert span._data["gen_ai.response.model"] == "gpt-4-streaming" From ca2b9ba98eb6ca513419f2c6ac0a92c4e817a314 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 12:24:34 +0100 Subject: [PATCH 4/9] rev: reverting mcp --- sentry_sdk/integrations/mcp.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/sentry_sdk/integrations/mcp.py b/sentry_sdk/integrations/mcp.py index a8dc78f18e..47fda272b7 100644 --- a/sentry_sdk/integrations/mcp.py +++ b/sentry_sdk/integrations/mcp.py @@ -7,7 +7,6 @@ Supports the low-level `mcp.server.lowlevel.Server` API. """ -from contextlib import contextmanager import inspect from functools import wraps from typing import TYPE_CHECKING @@ -353,30 +352,6 @@ def _prepare_handler_data( ) -@contextmanager -def ensure_span(*args, **kwargs): - """Ensure a span is created for the current context.""" - - current_span = sentry_sdk.get_current_span() - transaction_exists = ( - current_span is not None and current_span.containing_transaction is not None - ) - - if transaction_exists: - with sentry_sdk.start_span(*args, **kwargs) as span: - yield span - else: - with sentry_sdk.start_transaction(*args, **kwargs): - with sentry_sdk.start_span(*args, **kwargs) as span: - yield span - # with get_start_span_function()( - # op=OP.MCP_SERVER, - # name=span_name, - # origin=MCPIntegration.origin, - # ) as span: - # yield span - - async def _async_handler_wrapper( handler_type: str, func: "Callable[..., Any]", @@ -407,7 +382,7 @@ async def _async_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args, original_kwargs) # Start span and execute - with ensure_span( + with get_start_span_function()( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, @@ -479,7 +454,7 @@ def _sync_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args) # Start span and execute - with ensure_span( + with get_start_span_function()( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, From e5d0ffac50f6a8e19c058de8ed80d9ec687eafa5 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 12:32:35 +0100 Subject: [PATCH 5/9] feat: deduplicating code --- .../openai_agents/patches/agent_run.py | 98 +++++++++---------- .../openai_agents/patches/models.py | 49 +++++----- .../openai_agents/spans/__init__.py | 6 +- .../openai_agents/spans/ai_client.py | 42 +++----- .../openai_agents/test_openai_agents.py | 8 +- 5 files changed, 89 insertions(+), 114 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index bd6f2e05d7..73572f705b 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -36,19 +36,6 @@ def _patch_agent_run() -> None: original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs original_execute_final_output = agents._run_impl.RunImpl.execute_final_output - def _start_invoke_agent_span( - context_wrapper: "agents.RunContextWrapper", - agent: "agents.Agent", - kwargs: "dict[str, Any]", - ) -> "Span": - """Start an agent invocation span""" - # Store the agent on the context wrapper so we can access it later - context_wrapper._sentry_current_agent = agent - span = invoke_agent_span(context_wrapper, agent, kwargs) - context_wrapper._sentry_agent_span = span - - return span - def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool: """Check if there's an active agent span for this context""" return getattr(context_wrapper, "_sentry_current_agent", None) is not None @@ -59,6 +46,39 @@ def _get_current_agent( """Get the current agent from context wrapper""" return getattr(context_wrapper, "_sentry_current_agent", None) + def _maybe_start_agent_span( + context_wrapper: "agents.RunContextWrapper", + agent: "agents.Agent", + should_run_agent_start_hooks: bool, + span_kwargs: "dict[str, Any]", + is_streaming: bool = False, + ) -> "Optional[Span]": + """ + Start an agent invocation span if conditions are met. + Handles ending any existing span for a different agent. + + Returns the new span if started, or the existing span if conditions aren't met. + """ + if not (should_run_agent_start_hooks and agent and context_wrapper): + return getattr(context_wrapper, "_sentry_agent_span", None) + + # End any existing span for a different agent + if _has_active_agent_span(context_wrapper): + current_agent = _get_current_agent(context_wrapper) + if current_agent and current_agent != agent: + end_invoke_agent_span(context_wrapper, current_agent) + + # Store the agent on the context wrapper so we can access it later + context_wrapper._sentry_current_agent = agent + span = invoke_agent_span(context_wrapper, agent, span_kwargs) + context_wrapper._sentry_agent_span = span + agent._sentry_agent_span = span + + if is_streaming: + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + return span + @wraps( original_run_single_turn.__func__ if hasattr(original_run_single_turn, "__func__") @@ -72,26 +92,16 @@ async def patched_run_single_turn( context_wrapper = kwargs.get("context_wrapper") should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") - span = getattr(context_wrapper, "_sentry_agent_span", None) - # Start agent span when agent starts (but only once per agent) - if should_run_agent_start_hooks and agent and context_wrapper: - # End any existing span for a different agent - if _has_active_agent_span(context_wrapper): - current_agent = _get_current_agent(context_wrapper) - if current_agent and current_agent != agent: - end_invoke_agent_span(context_wrapper, current_agent) - - span = _start_invoke_agent_span(context_wrapper, agent, kwargs) - agent._sentry_agent_span = span + span = _maybe_start_agent_span( + context_wrapper, agent, should_run_agent_start_hooks, kwargs + ) - # Call original method with all the correct parameters try: result = await original_run_single_turn(*args, **kwargs) except Exception as exc: if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) - reraise(*sys.exc_info()) return result @@ -190,37 +200,25 @@ async def patched_run_single_turn_streamed( args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks") ) - span = getattr(context_wrapper, "_sentry_agent_span", None) - # Start agent span when agent starts (but only once per agent) - if should_run_agent_start_hooks and agent and context_wrapper: - # End any existing span for a different agent - if _has_active_agent_span(context_wrapper): - current_agent = _get_current_agent(context_wrapper) - if current_agent and current_agent != agent: - end_invoke_agent_span(context_wrapper, current_agent) - - # Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span) - # Include original_input from streamed_result for request messages - span_kwargs = { - "agent": agent, - "context_wrapper": context_wrapper, - "should_run_agent_start_hooks": should_run_agent_start_hooks, - } - if streamed_result and hasattr(streamed_result, "input"): - span_kwargs["original_input"] = streamed_result.input - - span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs) - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - agent._sentry_agent_span = span + # Build span kwargs with original_input from streamed_result for request messages + span_kwargs: "dict[str, Any]" = {} + if streamed_result and hasattr(streamed_result, "input"): + span_kwargs["original_input"] = streamed_result.input + + span = _maybe_start_agent_span( + context_wrapper, + agent, + should_run_agent_start_hooks, + span_kwargs, + is_streaming=True, + ) - # Call original streaming method try: result = await original_run_single_turn_streamed(*args, **kwargs) except Exception as exc: if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) - reraise(*sys.exc_info()) return result diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index bf369266b5..2601d7cd35 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -3,18 +3,13 @@ from sentry_sdk.integrations import DidNotEnable -from ..spans import ( - ai_client_span, - update_ai_client_span, - update_ai_client_span_streaming, -) +from ..spans import ai_client_span, update_ai_client_span from sentry_sdk.consts import SPANDATA from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Callable - + from typing import Any, Callable, Optional try: import agents @@ -22,6 +17,16 @@ raise DidNotEnable("OpenAI Agents not installed") +def _set_response_model_on_agent_span( + agent: "agents.Agent", response_model: "Optional[str]" +) -> None: + """Set the response model on the agent's invoke_agent span if available.""" + if response_model: + agent_span = getattr(agent, "_sentry_agent_span", None) + if agent_span: + agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + + def _create_get_model_wrapper( original_get_model: "Callable[..., Any]", ) -> "Callable[..., Any]": @@ -65,19 +70,13 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": with ai_client_span(agent, kwargs) as span: result = await original_get_response(*args, **kwargs) - # Get response model captured from _fetch_response + # Get response model captured from _fetch_response and clean up response_model = getattr(agent, "_sentry_response_model", None) if response_model: - # Set response model on agent span - agent_span = getattr(agent, "_sentry_agent_span", None) - if agent_span: - agent_span.set_data( - SPANDATA.GEN_AI_RESPONSE_MODEL, response_model - ) - # Clean up after use delattr(agent, "_sentry_response_model") - update_ai_client_span(span, agent, kwargs, result, response_model) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, result, response_model) return result @@ -128,18 +127,14 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": # Update span with response data (usage, output, model) if streaming_response: - update_ai_client_span_streaming(span, agent, streaming_response) - # Also set response model on agent span - if ( - hasattr(streaming_response, "model") + response_model = ( + str(streaming_response.model) + if hasattr(streaming_response, "model") and streaming_response.model - ): - agent_span = getattr(agent, "_sentry_agent_span", None) - if agent_span: - agent_span.set_data( - SPANDATA.GEN_AI_RESPONSE_MODEL, - str(streaming_response.model), - ) + else None + ) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, streaming_response) model.stream_response = wrapped_stream_response diff --git a/sentry_sdk/integrations/openai_agents/spans/__init__.py b/sentry_sdk/integrations/openai_agents/spans/__init__.py index af9c2c4e6e..64b979fc25 100644 --- a/sentry_sdk/integrations/openai_agents/spans/__init__.py +++ b/sentry_sdk/integrations/openai_agents/spans/__init__.py @@ -1,9 +1,5 @@ from .agent_workflow import agent_workflow_span # noqa: F401 -from .ai_client import ( - ai_client_span, - update_ai_client_span, - update_ai_client_span_streaming, -) # noqa: F401 +from .ai_client import ai_client_span, update_ai_client_span # noqa: F401 from .execute_tool import execute_tool_span, update_execute_tool_span # noqa: F401 from .handoff import handoff_span # noqa: F401 from .invoke_agent import ( diff --git a/sentry_sdk/integrations/openai_agents/spans/ai_client.py b/sentry_sdk/integrations/openai_agents/spans/ai_client.py index 16c0c441d0..aa25d0e9d0 100644 --- a/sentry_sdk/integrations/openai_agents/spans/ai_client.py +++ b/sentry_sdk/integrations/openai_agents/spans/ai_client.py @@ -44,43 +44,29 @@ def ai_client_span( def update_ai_client_span( span: "sentry_sdk.tracing.Span", - agent: "Agent", - get_response_kwargs: "dict[str, Any]", - result: "Any", - response_model: "Optional[str]" = None, -) -> None: - _set_usage_data(span, result.usage) - _set_output_data(span, result) - _create_mcp_execute_tool_spans(span, result) - - # Set response model if captured from raw response - if response_model is not None: - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) - - -def update_ai_client_span_streaming( - span: "sentry_sdk.tracing.Span", - agent: "Agent", response: "Any", + response_model: "Optional[str]" = None, ) -> None: """ - Update AI client span with data from a streaming response. - The streaming response has a different structure than the non-streaming response: - - response.usage contains usage data - - response.output contains output items (similar to result.output) - - response.model contains the response model + Update AI client span with response data. + Works for both streaming and non-streaming responses. + + Args: + span: The span to update + response: The response object (ModelResponse for non-streaming, Response for streaming) + response_model: Optional response model string (used when captured from raw API response) """ + # Set usage data if available if hasattr(response, "usage") and response.usage: _set_usage_data(span, response.usage) - # For streaming, set output data from the response + # Set output data and create MCP tool spans if available if hasattr(response, "output"): _set_output_data(span, response) - - # Create MCP tool spans if applicable - if hasattr(response, "output"): _create_mcp_execute_tool_spans(span, response) - # Set response model - if hasattr(response, "model") and response.model: + # Set response model - prefer explicit response_model, fall back to response.model + if response_model is not None: + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + elif hasattr(response, "model") and response.model: span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, str(response.model)) diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index 2bf41ceba9..89798775a1 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -2025,11 +2025,11 @@ async def test_streaming_span_update_captures_response_data( sentry_init, test_agent, mock_usage ): """ - Test that update_ai_client_span_streaming correctly captures response text, + Test that update_ai_client_span correctly captures response text, usage data, and response model from a streaming response. """ from sentry_sdk.integrations.openai_agents.spans.ai_client import ( - update_ai_client_span_streaming, + update_ai_client_span, ) sentry_init( @@ -2058,9 +2058,9 @@ async def test_streaming_span_update_captures_response_data( ) ] - # Test the update function directly + # Test the unified update function (works for both streaming and non-streaming) with start_span(op="gen_ai.chat", description="test chat") as span: - update_ai_client_span_streaming(span, test_agent, mock_streaming_response) + update_ai_client_span(span, mock_streaming_response) # Verify the span data was set correctly assert span._data["gen_ai.response.text"] == "Hello from streaming!" From 5f73e4f11fcffd44002642c40e426281a99a93d1 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 12:36:20 +0100 Subject: [PATCH 6/9] fix: mypy lint issues --- .../integrations/openai_agents/patches/agent_run.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 73572f705b..80e06a01f8 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -90,7 +90,7 @@ async def patched_run_single_turn( """Patched _run_single_turn that creates agent invocation spans""" agent = kwargs.get("agent") context_wrapper = kwargs.get("context_wrapper") - should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False) span = _maybe_start_agent_span( context_wrapper, agent, should_run_agent_start_hooks, kwargs @@ -196,8 +196,10 @@ async def patched_run_single_turn_streamed( streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") agent = args[1] if len(args) > 1 else kwargs.get("agent") context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") - should_run_agent_start_hooks = ( - args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks") + should_run_agent_start_hooks = bool( + args[5] + if len(args) > 5 + else kwargs.get("should_run_agent_start_hooks", False) ) # Build span kwargs with original_input from streamed_result for request messages From b09e718ad06c268d4db69577de73e4d093ae4a3a Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 12:54:22 +0100 Subject: [PATCH 7/9] fix: several review comments --- .../openai_agents/patches/agent_run.py | 2 +- .../openai_agents/patches/models.py | 17 ++++++++++++++--- .../openai_agents/spans/ai_client.py | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 80e06a01f8..70d20783e3 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -163,7 +163,7 @@ async def patched_execute_final_output( # (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper) if agent and hasattr(agent, "_sentry_workflow_span"): workflow_span = agent._sentry_workflow_span - workflow_span.__exit__(None, None, None) + workflow_span.__exit__(*sys.exc_info()) delattr(agent, "_sentry_workflow_span") return result diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index 2601d7cd35..fc0389d485 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -92,6 +92,11 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": Wrap stream_response to create an AI client span for streaming. stream_response is an async generator, so we yield events within the span. + Note: We use explicit try/finally instead of a context manager because + if the consumer abandons the stream (breaks early, network error, etc.), + the context manager's __exit__ may not be called. With try/finally, + cleanup happens even when GeneratorExit is thrown. + Note: stream_response is called with positional args unlike get_response which uses keyword args. The signature is: stream_response( @@ -108,6 +113,8 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": prompt, ) """ + import sys + # Build kwargs dict from positional args for span data capture span_kwargs = dict(kwargs) if len(args) > 0: @@ -115,10 +122,12 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": if len(args) > 1: span_kwargs["input"] = args[1] - with ai_client_span(agent, span_kwargs) as span: - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + span = ai_client_span(agent, span_kwargs) + span.__enter__() + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - streaming_response = None + streaming_response = None + try: async for event in original_stream_response(*args, **kwargs): # Capture the full response from ResponseCompletedEvent if hasattr(event, "response"): @@ -135,6 +144,8 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": ) _set_response_model_on_agent_span(agent, response_model) update_ai_client_span(span, streaming_response) + finally: + span.__exit__(*sys.exc_info()) model.stream_response = wrapped_stream_response diff --git a/sentry_sdk/integrations/openai_agents/spans/ai_client.py b/sentry_sdk/integrations/openai_agents/spans/ai_client.py index aa25d0e9d0..85fc667515 100644 --- a/sentry_sdk/integrations/openai_agents/spans/ai_client.py +++ b/sentry_sdk/integrations/openai_agents/spans/ai_client.py @@ -61,7 +61,7 @@ def update_ai_client_span( _set_usage_data(span, response.usage) # Set output data and create MCP tool spans if available - if hasattr(response, "output"): + if hasattr(response, "output") and response.output: _set_output_data(span, response) _create_mcp_execute_tool_spans(span, response) From 635606f9f6e7647dbe2ce45bc2e119d39f5ebe85 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 13:14:22 +0100 Subject: [PATCH 8/9] fix: missing workflow span closing --- .../openai_agents/patches/agent_run.py | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 70d20783e3..8290b8ae59 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -46,6 +46,19 @@ def _get_current_agent( """Get the current agent from context wrapper""" return getattr(context_wrapper, "_sentry_current_agent", None) + def _close_workflow_span_on_error(agent: "Optional[agents.Agent]") -> None: + """ + Close the workflow span on error for streaming executions. + + This ensures the workflow span is properly closed when an exception + occurs in _run_single_turn_streamed or execute_handoffs, preventing + resource leaks and ensuring accurate telemetry. + """ + if agent and hasattr(agent, "_sentry_workflow_span"): + workflow_span = agent._sentry_workflow_span + workflow_span.__exit__(*sys.exc_info()) + delattr(agent, "_sentry_workflow_span") + def _maybe_start_agent_span( context_wrapper: "agents.RunContextWrapper", agent: "agents.Agent", @@ -129,7 +142,9 @@ async def patched_execute_handoffs( # Call original method with all parameters try: result = await original_execute_handoffs(*args, **kwargs) - + except Exception: + _close_workflow_span_on_error(agent) + raise finally: # End span for current agent after handoff processing is complete if agent and context_wrapper and _has_active_agent_span(context_wrapper): @@ -161,10 +176,7 @@ async def patched_execute_final_output( # For streaming: close the workflow span if it exists # (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper) - if agent and hasattr(agent, "_sentry_workflow_span"): - workflow_span = agent._sentry_workflow_span - workflow_span.__exit__(*sys.exc_info()) - delattr(agent, "_sentry_workflow_span") + _close_workflow_span_on_error(agent) return result @@ -221,6 +233,7 @@ async def patched_run_single_turn_streamed( if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) + _close_workflow_span_on_error(agent) reraise(*sys.exc_info()) return result From fcd77a72e6e30141e1a5ececa5ef11a934b0c1b9 Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Tue, 13 Jan 2026 13:35:01 +0100 Subject: [PATCH 9/9] fix: review comment and deslop --- .../openai_agents/patches/agent_run.py | 24 ++++---------- .../openai_agents/patches/models.py | 31 ++----------------- .../openai_agents/spans/ai_client.py | 13 +------- 3 files changed, 10 insertions(+), 58 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 8290b8ae59..53fe2ccdc2 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -46,14 +46,8 @@ def _get_current_agent( """Get the current agent from context wrapper""" return getattr(context_wrapper, "_sentry_current_agent", None) - def _close_workflow_span_on_error(agent: "Optional[agents.Agent]") -> None: - """ - Close the workflow span on error for streaming executions. - - This ensures the workflow span is properly closed when an exception - occurs in _run_single_turn_streamed or execute_handoffs, preventing - resource leaks and ensuring accurate telemetry. - """ + def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None: + """Close the workflow span for streaming executions if it exists.""" if agent and hasattr(agent, "_sentry_workflow_span"): workflow_span = agent._sentry_workflow_span workflow_span.__exit__(*sys.exc_info()) @@ -143,7 +137,7 @@ async def patched_execute_handoffs( try: result = await original_execute_handoffs(*args, **kwargs) except Exception: - _close_workflow_span_on_error(agent) + _close_streaming_workflow_span(agent) raise finally: # End span for current agent after handoff processing is complete @@ -166,17 +160,13 @@ async def patched_execute_final_output( context_wrapper = kwargs.get("context_wrapper") final_output = kwargs.get("final_output") - # Call original method with all parameters try: result = await original_execute_final_output(*args, **kwargs) finally: - # End span for current agent after final output processing is complete if agent and context_wrapper and _has_active_agent_span(context_wrapper): end_invoke_agent_span(context_wrapper, agent, final_output) - - # For streaming: close the workflow span if it exists - # (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper) - _close_workflow_span_on_error(agent) + # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper) + _close_streaming_workflow_span(agent) return result @@ -204,7 +194,6 @@ async def patched_run_single_turn_streamed( server_conversation_tracker, # args[8] (optional) ) """ - # Extract positional arguments (streaming version doesn't use keyword-only args) streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") agent = args[1] if len(args) > 1 else kwargs.get("agent") context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") @@ -214,7 +203,6 @@ async def patched_run_single_turn_streamed( else kwargs.get("should_run_agent_start_hooks", False) ) - # Build span kwargs with original_input from streamed_result for request messages span_kwargs: "dict[str, Any]" = {} if streamed_result and hasattr(streamed_result, "input"): span_kwargs["original_input"] = streamed_result.input @@ -233,7 +221,7 @@ async def patched_run_single_turn_streamed( if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) - _close_workflow_span_on_error(agent) + _close_streaming_workflow_span(agent) reraise(*sys.exc_info()) return result diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index fc0389d485..b8259a7724 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -1,4 +1,5 @@ import copy +import sys from functools import wraps from sentry_sdk.integrations import DidNotEnable @@ -88,34 +89,8 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": @wraps(original_stream_response) async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": - """ - Wrap stream_response to create an AI client span for streaming. - stream_response is an async generator, so we yield events within the span. - - Note: We use explicit try/finally instead of a context manager because - if the consumer abandons the stream (breaks early, network error, etc.), - the context manager's __exit__ may not be called. With try/finally, - cleanup happens even when GeneratorExit is thrown. - - Note: stream_response is called with positional args unlike get_response - which uses keyword args. The signature is: - stream_response( - system_instructions, # args[0] - input, # args[1] - model_settings, # args[2] - tools, # args[3] - output_schema, # args[4] - handoffs, # args[5] - tracing, # args[6] - *, - previous_response_id, - conversation_id, - prompt, - ) - """ - import sys - - # Build kwargs dict from positional args for span data capture + # Uses explicit try/finally instead of context manager to ensure cleanup + # even if the consumer abandons the stream (GeneratorExit). span_kwargs = dict(kwargs) if len(args) > 0: span_kwargs["system_instructions"] = args[0] diff --git a/sentry_sdk/integrations/openai_agents/spans/ai_client.py b/sentry_sdk/integrations/openai_agents/spans/ai_client.py index 85fc667515..c099f133f4 100644 --- a/sentry_sdk/integrations/openai_agents/spans/ai_client.py +++ b/sentry_sdk/integrations/openai_agents/spans/ai_client.py @@ -47,25 +47,14 @@ def update_ai_client_span( response: "Any", response_model: "Optional[str]" = None, ) -> None: - """ - Update AI client span with response data. - Works for both streaming and non-streaming responses. - - Args: - span: The span to update - response: The response object (ModelResponse for non-streaming, Response for streaming) - response_model: Optional response model string (used when captured from raw API response) - """ - # Set usage data if available + """Update AI client span with response data (works for streaming and non-streaming).""" if hasattr(response, "usage") and response.usage: _set_usage_data(span, response.usage) - # Set output data and create MCP tool spans if available if hasattr(response, "output") and response.output: _set_output_data(span, response) _create_mcp_execute_tool_spans(span, response) - # Set response model - prefer explicit response_model, fall back to response.model if response_model is not None: span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) elif hasattr(response, "model") and response.model: