Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_create_get_model_wrapper,
_create_get_all_tools_wrapper,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
_patch_error_tracing,
)
Expand All @@ -25,12 +26,16 @@ def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run
)

# Creating the actual spans for each agent run.
# Patch streaming runner
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
)

# Creating the actual spans for each agent run (works for both streaming and non-streaming).
_patch_agent_run()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _create_get_model_wrapper # noqa: F401
from .tools import _create_get_all_tools_wrapper # noqa: F401
from .runner import _create_run_wrapper # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
133 changes: 106 additions & 27 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from functools import wraps

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import reraise
from ..spans import (
Expand Down Expand Up @@ -31,22 +32,10 @@ def _patch_agent_run() -> None:

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

def _start_invoke_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
kwargs: "dict[str, Any]",
) -> "Span":
"""Start an agent invocation span"""
# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, kwargs)
context_wrapper._sentry_agent_span = span

return span

def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None
Expand All @@ -57,6 +46,39 @@ def _get_current_agent(
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)

def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.

Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)

# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span

if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

return span

@wraps(
original_run_single_turn.__func__
if hasattr(original_run_single_turn, "__func__")
Expand All @@ -68,28 +90,18 @@ async def patched_run_single_turn(
"""Patched _run_single_turn that creates agent invocation spans"""
agent = kwargs.get("agent")
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

span = getattr(context_wrapper, "_sentry_agent_span", None)
# Start agent span when agent starts (but only once per agent)
if should_run_agent_start_hooks and agent and context_wrapper:
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)
span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)

span = _start_invoke_agent_span(context_wrapper, agent, kwargs)
agent._sentry_agent_span = span

# Call original method with all the correct parameters
try:
result = await original_run_single_turn(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)

reraise(*sys.exc_info())

return result
Expand Down Expand Up @@ -147,10 +159,77 @@ async def patched_execute_final_output(
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
end_invoke_agent_span(context_wrapper, agent, final_output)

# For streaming: close the workflow span if it exists
# (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper)
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(None, None, None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workflow span ignores exception info when streaming fails

Medium Severity

When streaming execution fails during execute_final_output, the workflow span is closed with workflow_span.__exit__(None, None, None) instead of workflow_span.__exit__(*sys.exc_info()). Since this code is in a finally block, if an exception was raised in the try block, sys.exc_info() would contain the exception info. By passing (None, None, None), the workflow span won't properly indicate that an error occurred. This is inconsistent with the error handling in _create_run_streamed_wrapper (runner.py line 104), which correctly passes *sys.exc_info() when an exception occurs.

Fix in Cursor Fix in Web

delattr(agent, "_sentry_workflow_span")

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
# Extract positional arguments (streaming version doesn't use keyword-only args)
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5]
if len(args) > 5
else kwargs.get("should_run_agent_start_hooks", False)
)

# Build span kwargs with original_input from streamed_result for request messages
span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)

try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
reraise(*sys.exc_info())

return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
93 changes: 79 additions & 14 deletions sentry_sdk/integrations/openai_agents/patches/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Callable

from typing import Any, Callable, Optional

try:
import agents
except ImportError:
raise DidNotEnable("OpenAI Agents not installed")


def _set_response_model_on_agent_span(
agent: "agents.Agent", response_model: "Optional[str]"
) -> None:
"""Set the response model on the agent's invoke_agent span if available."""
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)


def _create_get_model_wrapper(
original_get_model: "Callable[..., Any]",
) -> "Callable[..., Any]":
Expand All @@ -37,15 +46,19 @@ def wrapped_get_model(
# because we only patch its direct methods, all underlying data can remain unchanged.
model = copy.copy(original_get_model(agent, run_config))

# Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model
# Capture the request model name for spans (agent.model can be None when using defaults)
request_model_name = model.model if hasattr(model, "model") else str(model)
agent._sentry_request_model = request_model_name

# Wrap _fetch_response if it exists (for OpenAI models) to capture response model
if hasattr(model, "_fetch_response"):
original_fetch_response = model._fetch_response

@wraps(original_fetch_response)
async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any":
response = await original_fetch_response(*args, **kwargs)
if hasattr(response, "model"):
agent._sentry_raw_response_model = str(response.model)
if hasattr(response, "model") and response.model:
agent._sentry_response_model = str(response.model)
return response

model._fetch_response = wrapped_fetch_response
Expand All @@ -57,22 +70,74 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":
with ai_client_span(agent, kwargs) as span:
result = await original_get_response(*args, **kwargs)

response_model = getattr(agent, "_sentry_raw_response_model", None)
# Get response model captured from _fetch_response and clean up
response_model = getattr(agent, "_sentry_response_model", None)
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
)
delattr(agent, "_sentry_response_model")

delattr(agent, "_sentry_raw_response_model")

update_ai_client_span(span, agent, kwargs, result, response_model)
_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, result, response_model)

return result

model.get_response = wrapped_get_response

# Also wrap stream_response for streaming support
if hasattr(model, "stream_response"):
original_stream_response = model.stream_response

@wraps(original_stream_response)
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
"""
Wrap stream_response to create an AI client span for streaming.
stream_response is an async generator, so we yield events within the span.
Note: stream_response is called with positional args unlike get_response
which uses keyword args. The signature is:
stream_response(
system_instructions, # args[0]
input, # args[1]
model_settings, # args[2]
tools, # args[3]
output_schema, # args[4]
handoffs, # args[5]
tracing, # args[6]
*,
previous_response_id,
conversation_id,
prompt,
)
"""
# Build kwargs dict from positional args for span data capture
span_kwargs = dict(kwargs)
if len(args) > 0:
span_kwargs["system_instructions"] = args[0]
if len(args) > 1:
span_kwargs["input"] = args[1]

with ai_client_span(agent, span_kwargs) as span:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

streaming_response = None
async for event in original_stream_response(*args, **kwargs):
# Capture the full response from ResponseCompletedEvent
if hasattr(event, "response"):
streaming_response = event.response
yield event
Comment on lines +118 to +126
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A synchronous with statement inside an async generator may not close the ai_client_span if the stream is abandoned, leading to unclosed spans.
Severity: HIGH

🔍 Detailed Analysis

The function wrapped_stream_response uses a synchronous context manager (with ai_client_span(...)) inside an async generator. If the consumer of this async generator abandons the stream, for instance by breaking out of the loop early or due to a network error, the context manager's __exit__ method will not be called deterministically. This results in the ai_client_span remaining open until garbage collection. This can lead to memory leaks from accumulating unclosed spans and incorrect performance metrics in long-running applications.

💡 Suggested Fix

Use contextlib.asynccontextmanager to create an async context manager for the span. Alternatively, manually manage the span's lifecycle with a try...finally block to ensure span.finish() is called even if the generator is abandoned, which guarantees cleanup.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_sdk/integrations/openai_agents/patches/models.py#L118-L126

Potential issue: The function `wrapped_stream_response` uses a synchronous context
manager (`with ai_client_span(...)`) inside an async generator. If the consumer of this
async generator abandons the stream, for instance by breaking out of the loop early or
due to a network error, the context manager's `__exit__` method will not be called
deterministically. This results in the `ai_client_span` remaining open until garbage
collection. This can lead to memory leaks from accumulating unclosed spans and incorrect
performance metrics in long-running applications.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 8532796


# Update span with response data (usage, output, model)
if streaming_response:
response_model = (
str(streaming_response.model)
if hasattr(streaming_response, "model")
and streaming_response.model
else None
)
_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, streaming_response)

model.stream_response = wrapped_stream_response

return model

return wrapped_get_model
Loading
Loading