From fae9d71ec9b9898840e03d462956d261c104cb2e Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 31 Mar 2026 09:39:25 +0200 Subject: [PATCH 1/4] fix(conversations): sort list by timestamp in frontend (#111785) - The API response cannot be relied upon to be correctly sorted currently - Quick frontend fix is to sort on the frontend Closes TET-2150 --- .../conversations/hooks/useConversations.tsx | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/static/app/views/insights/pages/conversations/hooks/useConversations.tsx b/static/app/views/insights/pages/conversations/hooks/useConversations.tsx index 84aeb39a95e9..1576103ffa9f 100644 --- a/static/app/views/insights/pages/conversations/hooks/useConversations.tsx +++ b/static/app/views/insights/pages/conversations/hooks/useConversations.tsx @@ -72,13 +72,15 @@ export function useConversations() { const pageLinks = getResponseHeader?.('Link'); const data = useMemo(() => { - return (rawData ?? []).map(({firstInput: rawFirstInput, ...rest}): Conversation => { - const firstInput = - typeof rawFirstInput === 'string' - ? rawFirstInput - : (rawFirstInput?.find(content => content.type === 'text')?.text ?? null); - return {...rest, firstInput}; - }); + return (rawData ?? []) + .map(({firstInput: rawFirstInput, ...rest}): Conversation => { + const firstInput = + typeof rawFirstInput === 'string' + ? rawFirstInput + : (rawFirstInput?.find(content => content.type === 'text')?.text ?? null); + return {...rest, firstInput}; + }) + .sort((a, b) => b.endTimestamp - a.endTimestamp); }, [rawData]); return { From f93bbecf17c7b874cbbc6dafe89006d5b969f1e2 Mon Sep 17 00:00:00 2001 From: Amir Mujacic Date: Tue, 31 Mar 2026 10:58:01 +0200 Subject: [PATCH 2/4] feat(integrations): Add on-demand source context fetching from SCM integrations (#110324) When stack trace frames lack inline source context, this feature fetches the file from the configured SCM integration (GitHub, GitLab, Perforce) and returns the surrounding lines via a new API endpoint. - New endpoint: /projects/{org}/{project}/stacktrace-source-context/ - New utility: fetch_source_context_from_scm() with caching - Feature-flagged behind organizations:scm-source-context --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> --- src/sentry/api/serializers/models/project.py | 4 + src/sentry/api/urls.py | 6 + .../apidocs/examples/project_examples.py | 1 + src/sentry/core/endpoints/project_details.py | 20 ++ src/sentry/features/temporary.py | 1 + .../integrations/utils/source_context.py | 243 ++++++++++++++++++ src/sentry/issues/endpoints/__init__.py | 2 + .../project_stacktrace_source_context.py | 79 ++++++ src/sentry/models/options/project_option.py | 1 + src/sentry/projectoptions/defaults.py | 3 + .../utils/api/knownSentryApiUrls.generated.ts | 1 + .../integrations/utils/test_source_context.py | 224 ++++++++++++++++ .../test_project_stacktrace_source_context.py | 143 +++++++++++ 13 files changed, 728 insertions(+) create mode 100644 src/sentry/integrations/utils/source_context.py create mode 100644 src/sentry/issues/endpoints/project_stacktrace_source_context.py create mode 100644 tests/sentry/integrations/utils/test_source_context.py create mode 100644 tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py diff --git a/src/sentry/api/serializers/models/project.py b/src/sentry/api/serializers/models/project.py index 5d3b8c7a1919..25b5cce08e9e 100644 --- a/src/sentry/api/serializers/models/project.py +++ b/src/sentry/api/serializers/models/project.py @@ -964,6 +964,7 @@ class DetailedProjectResponse(ProjectWithTeamResponseDict): tempestFetchScreenshots: NotRequired[bool] autofixAutomationTuning: NotRequired[str] seerScannerAutomation: NotRequired[bool] + scmSourceContextEnabled: NotRequired[bool] debugFilesRole: NotRequired[str | None] @@ -1114,6 +1115,9 @@ def serialize( attrs, "sentry:seer_scanner_automation" ), "debugFilesRole": attrs["options"].get("sentry:debug_files_role"), + "scmSourceContextEnabled": self.get_value_with_default( + attrs, "sentry:scm_source_context_enabled" + ), } if has_tempest_access(obj.organization): diff --git a/src/sentry/api/urls.py b/src/sentry/api/urls.py index 0515bc29aa45..187c24d432cd 100644 --- a/src/sentry/api/urls.py +++ b/src/sentry/api/urls.py @@ -334,6 +334,7 @@ ProjectGroupIndexEndpoint, ProjectGroupStatsEndpoint, ProjectStacktraceLinkEndpoint, + ProjectStacktraceSourceContextEndpoint, RelatedIssuesEndpoint, SharedGroupDetailsEndpoint, ShortIdLookupEndpoint, @@ -3266,6 +3267,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]: ProjectStacktraceLinkEndpoint.as_view(), name="sentry-api-0-project-stacktrace-link", ), + re_path( + r"^(?P[^/]+)/(?P[^/]+)/stacktrace-source-context/$", + ProjectStacktraceSourceContextEndpoint.as_view(), + name="sentry-api-0-project-stacktrace-source-context", + ), re_path( r"^(?P[^/]+)/(?P[^/]+)/repo-path-parsing/$", ProjectRepoPathParsingEndpoint.as_view(), diff --git a/src/sentry/apidocs/examples/project_examples.py b/src/sentry/apidocs/examples/project_examples.py index cbfe38e1490d..28ad1f7ccbd2 100644 --- a/src/sentry/apidocs/examples/project_examples.py +++ b/src/sentry/apidocs/examples/project_examples.py @@ -273,6 +273,7 @@ "highlightTags": [], "highlightContext": {}, "highlightPreset": {"tags": [], "context": {}}, + "scmSourceContextEnabled": False, } PROJECT_SUMMARY = { diff --git a/src/sentry/core/endpoints/project_details.py b/src/sentry/core/endpoints/project_details.py index 94f73a7923f9..48d9bfb77b6f 100644 --- a/src/sentry/core/endpoints/project_details.py +++ b/src/sentry/core/endpoints/project_details.py @@ -257,6 +257,10 @@ class ProjectAdminSerializer(ProjectMemberSerializer): targetSampleRate = serializers.FloatField(required=False, min_value=0, max_value=1) dynamicSamplingBiases = DynamicSamplingBiasSerializer(required=False, many=True) tempestFetchScreenshots = serializers.BooleanField(required=False) + scmSourceContextEnabled = serializers.BooleanField( + required=False, + help_text="Enable on-demand source context fetching from SCM integrations for stack traces.", + ) # DO NOT ADD MORE TO OPTIONS # Each param should be a field in the serializer like above. @@ -473,6 +477,15 @@ def validate_tempestFetchScreenshots(self, value): ) return value + def validate_scmSourceContextEnabled(self, value): + if value: + organization = self.context["project"].organization + if not features.has("organizations:scm-source-context", organization): + raise serializers.ValidationError( + "Organization does not have the SCM source context feature enabled." + ) + return value + def validate_debugFilesRole(self, value): if value is None: return value @@ -761,6 +774,13 @@ def put(self, request: Request, project) -> Response: changed_proj_settings["sentry:tempest_fetch_screenshots"] = result[ "tempestFetchScreenshots" ] + if result.get("scmSourceContextEnabled") is not None: + if project.update_option( + "sentry:scm_source_context_enabled", result["scmSourceContextEnabled"] + ): + changed_proj_settings["sentry:scm_source_context_enabled"] = result[ + "scmSourceContextEnabled" + ] if result.get("targetSampleRate") is not None: if project.update_option( "sentry:target_sample_rate", round(result["targetSampleRate"], 4) diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index 30d9f8e5afa0..7d43ad0f34b8 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -137,6 +137,7 @@ def register_temporary_features(manager: FeatureManager) -> None: manager.add("organizations:integrations-github-copilot-agent", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-github-platform-detection", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-perforce", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) + manager.add("organizations:scm-source-context", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) # Project Management Integrations Feature Parity Flags manager.add("organizations:integrations-github_enterprise-project-management", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-gitlab-project-management", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) diff --git a/src/sentry/integrations/utils/source_context.py b/src/sentry/integrations/utils/source_context.py new file mode 100644 index 000000000000..cf07a961bdd1 --- /dev/null +++ b/src/sentry/integrations/utils/source_context.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import logging +from collections.abc import Sequence +from typing import TYPE_CHECKING, TypedDict + +from sentry.constants import ObjectStatus +from sentry.integrations.models.repository_project_path_config import RepositoryProjectPathConfig +from sentry.integrations.services.integration import integration_service +from sentry.integrations.source_code_management.repository import RepositoryIntegration +from sentry.issues.auto_source_code_config.code_mapping import ( + convert_stacktrace_frame_path_to_source_path, +) +from sentry.lang.javascript.utils import LINES_OF_CONTEXT, get_source_context +from sentry.shared_integrations.exceptions import ( + ApiError, + ApiRateLimitedError, + IntegrationConfigurationError, +) +from sentry.utils.event_frames import EventFrame + +if TYPE_CHECKING: + from sentry.integrations.base import IntegrationInstallation + from sentry.integrations.services.integration.model import RpcIntegration + from sentry.issues.endpoints.project_stacktrace_link import StacktraceLinkContext + from sentry.models.repository import Repository + +logger = logging.getLogger(__name__) + + +class SourceContextResult(TypedDict): + context: list[list[int | str]] # [[lineNo, content], ...] + error: str | None + source_url: str | None + + +def _format_context( + pre_context: list[bytes] | None, + context_line: bytes | None, + post_context: list[bytes] | None, + lineno: int, +) -> list[list[int | str]]: + """Format source context into [[lineNo, content], ...] tuples for the frontend.""" + result: list[list[int | str]] = [] + + start_line = lineno - len(pre_context or []) + + if pre_context: + for i, line in enumerate(pre_context): + result.append([start_line + i, line.decode("utf-8", errors="replace")]) + + if context_line is not None: + result.append([lineno, context_line.decode("utf-8", errors="replace")]) + + if post_context: + for i, line in enumerate(post_context): + result.append([lineno + 1 + i, line.decode("utf-8", errors="replace")]) + + return result + + +def _resolve_integration( + config: RepositoryProjectPathConfig, +) -> tuple[RpcIntegration, RepositoryIntegration] | None: + """Resolve the integration and installation for a code mapping config.""" + integration = integration_service.get_integration( + organization_integration_id=config.organization_integration_id, + status=ObjectStatus.ACTIVE, + ) + if not integration: + return None + + install = integration.get_installation(organization_id=config.project.organization_id) + if not isinstance(install, RepositoryIntegration): + return None + + return integration, install + + +def _fetch_file_from_scm( + install: IntegrationInstallation, + integration_id: int, + repository: Repository, + src_path: str, + ref: str, +) -> tuple[str | None, str | None]: + """ + Fetch file content from SCM. + + Returns (file_content, error). If error is "rate_limited", the caller + should stop iterating entirely. + """ + try: + client = install.get_client() + except Exception: + logger.warning( + "scm_source_context.get_client_error", + extra={ + "integration_id": integration_id, + "src_path": src_path, + }, + exc_info=True, + ) + return None, "integration_error" + + try: + file_content = client.get_file(repository, src_path, ref) + except NotImplementedError: + return None, "get_file_not_supported" + except ApiRateLimitedError: + return None, "rate_limited" + except ApiError as e: + if e.code == 404: + return None, "file_not_found" + elif e.code == 403: + return None, "integration_forbidden" + else: + logger.warning( + "scm_source_context.fetch_error", + extra={ + "error": str(e), + "integration_id": integration_id, + "src_path": src_path, + }, + ) + return None, "integration_error" + + return file_content, None + + +def _extract_source_lines( + file_content: str, + lineno: int, + context_lines: int, +) -> tuple[list[list[int | str]], str | None]: + """ + Extract context lines from file content around the given line number. + + Returns (context, error). + """ + lines = [line.encode("utf-8") for line in file_content.splitlines()] + + if lineno < 1 or lineno > len(lines): + return [], "line_out_of_range" + + pre_context, context_line, post_context = get_source_context(lines, lineno, context_lines) + + context = _format_context(pre_context, context_line, post_context, lineno) + return context, None + + +def fetch_source_context_from_scm( + configs: Sequence[RepositoryProjectPathConfig], + ctx: StacktraceLinkContext, + context_lines: int = LINES_OF_CONTEXT, +) -> SourceContextResult: + """ + Fetch source context lines from an SCM integration for a stack trace frame. + + Iterates code mappings to resolve the frame file path to a repository path, + then fetches the file content via the integration client and extracts the + surrounding lines. + """ + result: SourceContextResult = { + "context": [], + "error": None, + "source_url": None, + } + + line_no_str = ctx.get("line_no") + if not line_no_str: + result["error"] = "missing_line_number" + return result + + try: + lineno = int(line_no_str) + except (TypeError, ValueError): + result["error"] = "invalid_line_number" + return result + + frame = EventFrame.from_dict(ctx) + platform = ctx["platform"] + sdk_name = ctx.get("sdk_name") + + # Resolve integration and install once per unique org_integration_id + resolved_integrations: dict[int, tuple[RpcIntegration, RepositoryIntegration] | None] = {} + + for config in configs: + src_path = convert_stacktrace_frame_path_to_source_path( + frame=frame, + platform=platform, + sdk_name=sdk_name, + code_mapping=config, + ) + if not src_path: + continue + + org_integration_id = config.organization_integration_id + if org_integration_id not in resolved_integrations: + resolved_integrations[org_integration_id] = _resolve_integration(config) + + resolved = resolved_integrations[org_integration_id] + if resolved is None: + continue + + integration, install = resolved + + ref = ctx.get("commit_id") or str(config.default_branch or "") + + file_content, fetch_error = _fetch_file_from_scm( + install, integration.id, config.repository, src_path, ref + ) + + if fetch_error: + result["error"] = fetch_error + if fetch_error == "rate_limited": + return result + continue + + if file_content is None: + continue + + context, extract_error = _extract_source_lines(file_content, lineno, context_lines) + if extract_error: + result["error"] = extract_error + continue + + result["context"] = context + result["error"] = None + + try: + source_url = install.get_stacktrace_link( + config.repository, src_path, str(config.default_branch or ""), ctx.get("commit_id") + ) + result["source_url"] = source_url + except (ApiError, IntegrationConfigurationError): + pass + + return result + + if not result["error"]: + result["error"] = "no_code_mapping_match" + return result diff --git a/src/sentry/issues/endpoints/__init__.py b/src/sentry/issues/endpoints/__init__.py index a2ebb4670c4e..2163292b0215 100644 --- a/src/sentry/issues/endpoints/__init__.py +++ b/src/sentry/issues/endpoints/__init__.py @@ -30,6 +30,7 @@ from .project_group_index import ProjectGroupIndexEndpoint from .project_group_stats import ProjectGroupStatsEndpoint from .project_stacktrace_link import ProjectStacktraceLinkEndpoint +from .project_stacktrace_source_context import ProjectStacktraceSourceContextEndpoint from .related_issues import RelatedIssuesEndpoint from .shared_group_details import SharedGroupDetailsEndpoint from .source_map_debug import SourceMapDebugEndpoint @@ -66,6 +67,7 @@ "ProjectGroupIndexEndpoint", "ProjectGroupStatsEndpoint", "ProjectStacktraceLinkEndpoint", + "ProjectStacktraceSourceContextEndpoint", "RelatedIssuesEndpoint", "SharedGroupDetailsEndpoint", "ShortIdLookupEndpoint", diff --git a/src/sentry/issues/endpoints/project_stacktrace_source_context.py b/src/sentry/issues/endpoints/project_stacktrace_source_context.py new file mode 100644 index 000000000000..69c2ac577d75 --- /dev/null +++ b/src/sentry/issues/endpoints/project_stacktrace_source_context.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging + +from rest_framework.request import Request +from rest_framework.response import Response + +from sentry import features +from sentry.api.api_owners import ApiOwner +from sentry.api.api_publish_status import ApiPublishStatus +from sentry.api.base import cell_silo_endpoint +from sentry.api.bases.project import ProjectEndpoint +from sentry.integrations.utils.source_context import fetch_source_context_from_scm +from sentry.issues.auto_source_code_config.code_mapping import get_sorted_code_mapping_configs +from sentry.issues.endpoints.project_stacktrace_link import generate_context +from sentry.models.project import Project + +logger = logging.getLogger(__name__) + + +@cell_silo_endpoint +class ProjectStacktraceSourceContextEndpoint(ProjectEndpoint): + publish_status = { + "GET": ApiPublishStatus.PRIVATE, + } + """ + Returns source context lines for a stack trace frame by fetching the file + from the configured SCM integration (GitHub, GitLab, Perforce, etc.). + + `file`: The file path from the stack trace (required) + `lineNo`: The line number to fetch context around (required) + `commitId` (optional): The commit_id for the last commit of the release + `platform` (optional): The platform of the event + `sdkName` (optional): The sdk.name associated with the event + `absPath` (optional): The abs_path field value of the relevant stack frame + `module` (optional): The module field value of the relevant stack frame + `package` (optional): The package field value of the relevant stack frame + """ + + owner = ApiOwner.ISSUES + + def get(self, request: Request, project: Project) -> Response: + if not features.has( + "organizations:scm-source-context", + project.organization, + actor=request.user, + ): + return Response(status=404) + + if not project.get_option("sentry:scm_source_context_enabled", False): + return Response(status=404) + + ctx = generate_context(request.GET) + + if ctx["file"] == "": + return Response({"detail": "Filepath is required"}, status=400) + + if not ctx["line_no"]: + return Response({"detail": "lineNo is required"}, status=400) + + configs = get_sorted_code_mapping_configs(project) + if not configs: + return Response( + { + "context": [], + "error": "no_code_mappings_for_project", + "sourceUrl": None, + } + ) + + result = fetch_source_context_from_scm(configs, ctx) + + return Response( + { + "context": result["context"], + "error": result["error"], + "sourceUrl": result["source_url"], + } + ) diff --git a/src/sentry/models/options/project_option.py b/src/sentry/models/options/project_option.py index 8ff587fb2e42..c5f3690a65b7 100644 --- a/src/sentry/models/options/project_option.py +++ b/src/sentry/models/options/project_option.py @@ -78,6 +78,7 @@ "sentry:preprod_snapshot_status_checks_fail_on_added", "sentry:preprod_snapshot_status_checks_fail_on_removed", "sentry:preprod_distribution_pr_comments_enabled_by_customer", + "sentry:scm_source_context_enabled", "quotas:spike-protection-disabled", "feedback:branding", "digests:mail:minimum_delay", diff --git a/src/sentry/projectoptions/defaults.py b/src/sentry/projectoptions/defaults.py index 4534fbdedbbe..6a29b4add734 100644 --- a/src/sentry/projectoptions/defaults.py +++ b/src/sentry/projectoptions/defaults.py @@ -210,3 +210,6 @@ # Boolean to enable/disable build distribution PR comments for this project. register(key="sentry:preprod_distribution_pr_comments_enabled_by_customer", default=True) + +# Whether to enable on-demand source context fetching from SCM integrations +register(key="sentry:scm_source_context_enabled", default=False) diff --git a/static/app/utils/api/knownSentryApiUrls.generated.ts b/static/app/utils/api/knownSentryApiUrls.generated.ts index 79a6c8e82e68..9bd1a50fefc8 100644 --- a/static/app/utils/api/knownSentryApiUrls.generated.ts +++ b/static/app/utils/api/knownSentryApiUrls.generated.ts @@ -721,6 +721,7 @@ export type KnownSentryApiUrls = | '/projects/$organizationIdOrSlug/$projectIdOrSlug/seer/preferences/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-coverage/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-link/' + | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-source-context/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/statistical-detector/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stats/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/symbol-sources/' diff --git a/tests/sentry/integrations/utils/test_source_context.py b/tests/sentry/integrations/utils/test_source_context.py new file mode 100644 index 000000000000..b16c671b7466 --- /dev/null +++ b/tests/sentry/integrations/utils/test_source_context.py @@ -0,0 +1,224 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from sentry.integrations.utils.source_context import ( + _format_context, + fetch_source_context_from_scm, +) +from sentry.issues.endpoints.project_stacktrace_link import StacktraceLinkContext +from sentry.shared_integrations.exceptions import ApiError, ApiRateLimitedError +from sentry.silo.base import SiloMode +from sentry.testutils.cases import TestCase +from sentry.testutils.silo import assume_test_silo_mode + + +class FormatContextTest(TestCase): + def test_format_context_basic(self) -> None: + pre = [b"line1", b"line2"] + ctx_line = b"line3" + post = [b"line4", b"line5"] + result = _format_context(pre, ctx_line, post, lineno=3) + assert result == [ + [1, "line1"], + [2, "line2"], + [3, "line3"], + [4, "line4"], + [5, "line5"], + ] + + def test_format_context_no_pre_or_post(self) -> None: + result = _format_context(None, b"only_line", None, lineno=1) + assert result == [[1, "only_line"]] + + def test_format_context_none_context_line(self) -> None: + result = _format_context(None, None, None, lineno=1) + assert result == [] + + def test_format_context_utf8_decode(self) -> None: + result = _format_context(None, "héllo".encode(), None, lineno=1) + assert result == [[1, "héllo"]] + + +class FetchSourceContextTest(TestCase): + def setUp(self) -> None: + super().setUp() + with assume_test_silo_mode(SiloMode.CONTROL): + self.integration, self.oi = self.create_provider_integration_for( + self.organization, self.user, provider="example", name="Example" + ) + + self.repo = self.create_repo( + project=self.project, + name="getsentry/sentry", + ) + self.repo.integration_id = self.integration.id + self.repo.provider = "example" + self.repo.save() + + self.code_mapping = self.create_code_mapping( + organization_integration=self.oi, + project=self.project, + repo=self.repo, + stack_root="src/", + source_root="src/", + ) + + def _make_ctx(self, **overrides: str | None) -> StacktraceLinkContext: + defaults: StacktraceLinkContext = { + "file": "src/app/main.py", + "filename": "src/app/main.py", + "platform": "python", + "abs_path": "src/app/main.py", + "commit_id": None, + "group_id": None, + "line_no": "10", + "module": None, + "package": None, + "sdk_name": None, + } + defaults.update(overrides) # type: ignore[typeddict-item] + return defaults + + def test_missing_line_number(self) -> None: + ctx = self._make_ctx(line_no=None) + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "missing_line_number" + assert result["context"] == [] + + def test_invalid_line_number(self) -> None: + ctx = self._make_ctx(line_no="abc") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "invalid_line_number" + assert result["context"] == [] + + def test_no_code_mapping_match(self) -> None: + ctx = self._make_ctx( + file="unknown/path.py", filename="unknown/path.py", abs_path="unknown/path.py" + ) + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "no_code_mapping_match" + assert result["context"] == [] + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_integration_not_found(self, mock_service: MagicMock) -> None: + mock_service.get_integration.return_value = None + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "no_code_mapping_match" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_successful_fetch(self, mock_service: MagicMock) -> None: + file_content = "\n".join([f"line{i}" for i in range(1, 20)]) + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + mock_install.get_stacktrace_link.return_value = "https://github.com/example" + + # Make mock pass isinstance check + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx(line_no="10") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + + assert result["error"] is None + assert len(result["context"]) == 11 # 5 pre + 1 context + 5 post + assert result["context"][5] == [10, "line10"] + assert result["source_url"] == "https://github.com/example" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_get_file_not_supported(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = NotImplementedError + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "get_file_not_supported" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_rate_limited(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiRateLimitedError("rate limited") + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "rate_limited" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_file_not_found(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiError("Not Found", code=404) + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "file_not_found" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_line_out_of_range(self, mock_service: MagicMock) -> None: + file_content = "line1\nline2\nline3" + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx(line_no="100") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "line_out_of_range" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_get_client_exception(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_install.get_client.side_effect = Exception("identity not found") + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "integration_error" diff --git a/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py b/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py new file mode 100644 index 000000000000..f7415941c12a --- /dev/null +++ b/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from sentry.silo.base import SiloMode +from sentry.testutils.cases import APITestCase +from sentry.testutils.silo import assume_test_silo_mode + + +class ProjectStacktraceSourceContextTest(APITestCase): + endpoint = "sentry-api-0-project-stacktrace-source-context" + + def setUp(self) -> None: + super().setUp() + with assume_test_silo_mode(SiloMode.CONTROL): + self.integration, self.oi = self.create_provider_integration_for( + self.organization, self.user, provider="example", name="Example" + ) + + self.repo = self.create_repo( + project=self.project, + name="getsentry/sentry", + ) + self.repo.integration_id = self.integration.id + self.repo.provider = "example" + self.repo.save() + + self.code_mapping = self.create_code_mapping( + organization_integration=self.oi, + project=self.project, + repo=self.repo, + stack_root="src/", + source_root="src/", + ) + + self.login_as(self.user) + self.project.update_option("sentry:scm_source_context_enabled", True) + + def test_feature_flag_required(self) -> None: + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 404 + + def test_project_option_required(self) -> None: + self.project.update_option("sentry:scm_source_context_enabled", False) + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 404 + + def test_missing_filepath(self) -> None: + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 400 + + def test_missing_lineno(self) -> None: + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "platform": "python"}, + ) + assert response.status_code == 400 + + def test_no_code_mappings(self) -> None: + self.code_mapping.delete() + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.data["error"] == "no_code_mappings_for_project" + assert response.data["context"] == [] + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_successful_fetch(self, mock_service: MagicMock) -> None: + file_content = "\n".join([f"line{i}" for i in range(1, 20)]) + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + mock_install.get_stacktrace_link.return_value = "https://github.com/file" + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={ + "file": "src/app/main.py", + "lineNo": "10", + "platform": "python", + }, + ) + assert response.data["error"] is None + assert len(response.data["context"]) == 11 + assert response.data["context"][5] == [10, "line10"] + assert response.data["sourceUrl"] == "https://github.com/file" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_file_not_found(self, mock_service: MagicMock) -> None: + from sentry.integrations.source_code_management.repository import RepositoryIntegration + from sentry.shared_integrations.exceptions import ApiError + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiError("Not Found", code=404) + + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={ + "file": "src/app/main.py", + "lineNo": "10", + "platform": "python", + }, + ) + assert response.data["error"] == "file_not_found" + assert response.data["context"] == [] From b71c092f6de4d58e2435f24964f303d27bf7695f Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Tue, 31 Mar 2026 11:31:42 +0200 Subject: [PATCH 3/4] feat(hybridcloud): async apigateway (#111307) This changes the `apigateway` proxy to be async, with the idea to serve the relevant deployment of control silo in ASGI rather than WSGI. The rationale here is to avoid situations in which we exhaust the server's threadpool by just waiting for apigateway requests to complete, as we saw in INCs 2054/2056. **Note:** the APIGateway changes are gated into a separated Python module, the async flow is enabled through `SENTRY_ASYNC_APIGW` environment variable. This allows us to control the rollout of the change in prod. Tests and local devserver are instead always using the new code. Detailed changes: - [x] Make APIGateway proxy `async`, switching inner client impl from `requests` to `httpx` - [x] Change APIGateway middleware to work both in ASGI and WSGI contexts (with the latter using `async_to_sync`) - [x] Update relevant tests interacting with APIGateway - [x] Fix proxy acceptance test - Fix ORM calls in the custom SDK integration - [x] Bypass ORM calls in SDK custom logging integration - [x] Restore/adapt circuit brakers --- .github/CODEOWNERS | 1 + pyproject.toml | 3 +- src/sentry/asgi.py | 18 ++ src/sentry/conf/server.py | 11 +- .../hybridcloud/apigateway_async/__init__.py | 3 + .../apigateway_async/apigateway.py | 108 ++++++++ .../apigateway_async/circuitbreaker.py | 104 ++++++++ .../apigateway_async/middleware.py | 78 ++++++ .../hybridcloud/apigateway_async/proxy.py | 219 ++++++++++++++++ .../objectstore/endpoints/organization.py | 39 ++- src/sentry/runner/commands/devserver.py | 2 + src/sentry/services/http.py | 7 +- src/sentry/silo/base.py | 3 +- src/sentry/testutils/asserts.py | 6 +- src/sentry/testutils/cases.py | 67 ++--- src/sentry/testutils/helpers/apigateway.py | 156 +++++++++--- src/sentry/testutils/helpers/response.py | 15 +- src/sentry/testutils/silo.py | 39 ++- src/sentry/utils/http.py | 46 +++- src/sentry/utils/sdk.py | 10 + tests/acceptance/test_proxy.py | 3 +- tests/conftest.py | 3 + .../hybridcloud/apigateway/test_apigateway.py | 92 +++---- .../apigateway/test_apigateway_helpers.py | 21 +- .../hybridcloud/apigateway/test_proxy.py | 236 ++---------------- tests/sentry/middleware/test_proxy.py | 28 ++- .../endpoints/test_organization.py | 71 ++++-- uv.lock | 25 +- 28 files changed, 1023 insertions(+), 391 deletions(-) create mode 100644 src/sentry/asgi.py create mode 100644 src/sentry/hybridcloud/apigateway_async/__init__.py create mode 100644 src/sentry/hybridcloud/apigateway_async/apigateway.py create mode 100644 src/sentry/hybridcloud/apigateway_async/circuitbreaker.py create mode 100644 src/sentry/hybridcloud/apigateway_async/middleware.py create mode 100644 src/sentry/hybridcloud/apigateway_async/proxy.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a2f08ede5ed7..fd5d107b0a33 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -35,6 +35,7 @@ /src/sentry/projectoptions/ @getsentry/app-backend /src/sentry/receivers/ @getsentry/app-backend /src/sentry/ratelimits/ @getsentry/app-backend +/src/sentry/asgi.py @getsentry/app-backend /tests/js/sentry-test/ @getsentry/app-frontend /static/app/utils/ @getsentry/app-frontend diff --git a/pyproject.toml b/pyproject.toml index cf052bb24e53..60264b09ad04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "google-cloud-storage-transfer>=1.17.0", "google-crc32c>=1.6.0", "googleapis-common-protos>=1.63.2", - "granian[pname,reload]>=2.7", + "granian[pname,reload,uvloop]>=2.7", "grpc-google-iam-v1>=0.13.1", # Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1 # See https://github.com/grpc/grpc/issues/23796 and @@ -45,6 +45,7 @@ dependencies = [ "grpcio>=1.67.0", # not directly used, but provides a speedup for redis "hiredis>=2.3.2", + "httpx>=0.28.1", "jsonschema>=4.20.0", "lxml>=5.3.0", "maxminddb>=2.3.0", diff --git a/src/sentry/asgi.py b/src/sentry/asgi.py new file mode 100644 index 000000000000..e27dba83252c --- /dev/null +++ b/src/sentry/asgi.py @@ -0,0 +1,18 @@ +import os.path +import sys + +# Add the project to the python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) + +# Configure the application only if it seemingly isn't already configured +from django.conf import settings + +if not settings.configured: + from sentry.runner import configure + + configure() + +from django.core.handlers.asgi import ASGIHandler + +# Run ASGI handler for the application +application = ASGIHandler() diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7f448f1cc9ed..02e2e9f64e88 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -371,6 +371,12 @@ def env( # so that responses aren't modified after Content-Length is set, or have the # response modifying middleware reset the Content-Length header. # This is because CommonMiddleware Sets the Content-Length header for non-streaming responses. +APIGW_ASYNC = os.environ.get("SENTRY_APIGW_ASYNC", "").lower() in ("1", "true", "y", "yes") +APIGW_MIDDLEWARE = ( + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + if APIGW_ASYNC + else "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" +) MIDDLEWARE: tuple[str, ...] = ( "csp.middleware.CSPMiddleware", "sentry.middleware.health.HealthCheck", @@ -387,7 +393,7 @@ def env( "sentry.middleware.auth.AuthenticationMiddleware", "sentry.middleware.ai_agent.AIAgentMiddleware", "sentry.middleware.integrations.IntegrationControlMiddleware", - "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware", + APIGW_MIDDLEWARE, "sentry.middleware.demo_mode_guard.DemoModeGuardMiddleware", "sentry.middleware.customer_domain.CustomerDomainMiddleware", "sentry.middleware.sudo.SudoMiddleware", @@ -3181,6 +3187,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: } # Used in tests to skip forwarding relay paths to a region silo that does not exist. APIGATEWAY_PROXY_SKIP_RELAY = False +APIGATEWAY_PROXY_MAX_CONCURRENCY = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_CONCURRENCY", 100)) +APIGATEWAY_PROXY_MAX_FAILURES = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_FAILURES", 100)) +APIGATEWAY_PROXY_FAILURE_WINDOW = int(os.environ.get("SENTRY_APIGW_PROXY_FAILURE_WINDOW", 60)) # Shared resource ids for accounting EVENT_PROCESSING_STORE = "rc_processing_redis" diff --git a/src/sentry/hybridcloud/apigateway_async/__init__.py b/src/sentry/hybridcloud/apigateway_async/__init__.py new file mode 100644 index 000000000000..e4b8561f6bee --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/__init__.py @@ -0,0 +1,3 @@ +from .apigateway import proxy_request_if_needed + +__all__ = ("proxy_request_if_needed",) diff --git a/src/sentry/hybridcloud/apigateway_async/apigateway.py b/src/sentry/hybridcloud/apigateway_async/apigateway.py new file mode 100644 index 000000000000..3e02e6613aee --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/apigateway.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import Any + +from django.conf import settings +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from sentry.silo.base import SiloLimit, SiloMode +from sentry.types.cell import get_cell_by_name +from sentry.utils import metrics + +from .proxy import ( + proxy_cell_request, + proxy_error_embed_request, + proxy_request, +) + +logger = logging.getLogger(__name__) + + +def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset[SiloMode] | None: + view_class = getattr(view_func, "view_class", None) + if not view_class: + return None + if not hasattr(view_class, "silo_limit"): + return None + endpoint_silo_limit: SiloLimit = view_class.silo_limit + return endpoint_silo_limit.modes + + +async def proxy_request_if_needed( + request: Request, + view_func: Callable[..., HttpResponseBase], + view_kwargs: dict[str, Any], +) -> HttpResponseBase | None: + """ + Main execution flow for the API Gateway. + returns None if proxying is not required, or a response if the proxy was successful. + """ + current_silo_mode = SiloMode.get_current_mode() + if current_silo_mode != SiloMode.CONTROL: + return None + + silo_modes = _get_view_silo_mode(view_func) + if not silo_modes or current_silo_mode in silo_modes: + return None + + url_name = "unknown" + if request.resolver_match: + url_name = request.resolver_match.url_name or url_name + + if "organization_slug" in view_kwargs or "organization_id_or_slug" in view_kwargs: + org_id_or_slug = str( + view_kwargs.get("organization_slug") or view_kwargs.get("organization_id_or_slug", "") + ) + + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "orgslug", + }, + ) + return await proxy_request(request, org_id_or_slug, url_name) + + if url_name == "sentry-error-page-embed" and "dsn" in request.GET: + # Error embed modal is special as customers can't easily use cell URLs. + dsn = request.GET["dsn"] + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "error-embed", + }, + ) + return await proxy_error_embed_request(request, dsn, url_name) + + if ( + request.resolver_match + and request.resolver_match.url_name in settings.REGION_PINNED_URL_NAMES + ): + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "regionpin", + }, + ) + + return await proxy_cell_request(request, cell, url_name) + + if url_name != "unknown": + # If we know the URL but didn't proxy it record we could be missing + # URL handling and that needs to be fixed. + metrics.incr( + "apigateway.proxy_request", + tags={ + "kind": "noop", + "url_name": url_name, + }, + ) + logger.info("apigateway.unknown_url", extra={"url": request.path}) + + return None diff --git a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py new file mode 100644 index 000000000000..e43500fccfa9 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import asyncio +import time +from collections import defaultdict +from typing import Any + +from django.conf import settings + + +class CircuitBreaker: + __slots__ = [ + "concurrency", + "counter_window", + "failures", + "semaphore", + "_clock", + "_counters", + "_counter_idx", + ] + + def __init__(self, concurrency: int, failures: tuple[int, int]) -> None: + self.concurrency = concurrency + self.counter_window = failures[0] + self.failures = failures[1] + self.semaphore = asyncio.Semaphore(self.concurrency) + self._clock = 0 + self._counters = [0, 0] + self._counter_idx = 0 + + def _counter_flip(self, clock: int) -> None: + self._clock = clock + prev = self._counter_idx + self._counter_idx = 1 - prev + self._counters[prev] = 0 + + def _maybe_counter_flip(self) -> None: + now = int(time.monotonic()) + delta = now - self._clock + if delta > 0: + if delta // self.counter_window: + self._counter_flip(now) + + def counter_incr(self) -> None: + self._maybe_counter_flip() + self._counters[self._counter_idx] += 1 + + def window_overflow(self) -> bool: + self._maybe_counter_flip() + return self._counters[self._counter_idx] > self.failures + + def overflow(self) -> bool: + return self.semaphore.locked() + + def ctx(self) -> CircuitBreakerCtx: + return CircuitBreakerCtx(self) + + +class CircuitBreakerOverflow(Exception): ... + + +class CircuitBreakerWindowOverflow(Exception): ... + + +class CircuitBreakerCtx: + __slots__ = ["cb"] + + def __init__(self, cb: CircuitBreaker): + self.cb = cb + + def incr_failures(self) -> None: + self.cb.counter_incr() + + async def __aenter__(self) -> CircuitBreakerCtx: + if self.cb.overflow(): + raise CircuitBreakerOverflow + await self.cb.semaphore.acquire() + if self.cb.window_overflow(): + self.cb.semaphore.release() + raise CircuitBreakerWindowOverflow + return self + + async def __aexit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None: + self.cb.semaphore.release() + + +class CircuitBreakerManager: + __slots__ = ["objs"] + + def __init__( + self, + max_concurrency: int | None = None, + failures: int | None = None, + failure_window: int | None = None, + ): + concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY + failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES + failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW + self.objs: dict[str, CircuitBreaker] = defaultdict( + lambda: CircuitBreaker(concurrency, (failure_window, failures)) + ) + + def get(self, key: str) -> CircuitBreakerCtx: + return self.objs[key].ctx() diff --git a/src/sentry/hybridcloud/apigateway_async/middleware.py b/src/sentry/hybridcloud/apigateway_async/middleware.py new file mode 100644 index 000000000000..446210ee1836 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/middleware.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from typing import Any + +from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from . import proxy_request_if_needed + + +class ApiGatewayMiddleware: + """Proxy requests intended for remote silos""" + + async_capable = True + sync_capable = True + + def __init__(self, get_response: Callable[[Request], HttpResponseBase]): + self.get_response = get_response + if iscoroutinefunction(self.get_response): + markcoroutinefunction(self) + + def __call__(self, request: Request) -> Any: + if iscoroutinefunction(self): + return self.__acall__(request) + return self.get_response(request) + + async def __acall__(self, request: Request) -> HttpResponseBase: + return await self.get_response(request) # type: ignore[misc] + + def process_view( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return self._process_view_match(request, view_func, view_args, view_kwargs) + + def _process_view_match( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> Any: + #: we check if we're in an async or sync runtime once, then + # overwrite the method with the actual impl. + try: + asyncio.get_running_loop() + method = self._process_view_inner + except RuntimeError: + method = self._process_view_sync # type: ignore[assignment] + setattr(self, "_process_view_match", method) + return method(request, view_func, view_args, view_kwargs) + + def _process_view_sync( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) + + async def _process_view_inner( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) + if proxy_response is not None: + return proxy_response + return None diff --git a/src/sentry/hybridcloud/apigateway_async/proxy.py b/src/sentry/hybridcloud/apigateway_async/proxy.py new file mode 100644 index 000000000000..7fed6928dfae --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/proxy.py @@ -0,0 +1,219 @@ +""" +Utilities related to proxying a request to a cell +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncGenerator, AsyncIterator +from urllib.parse import urljoin, urlparse + +import httpx +from asgiref.sync import sync_to_async +from django.conf import settings +from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse +from django.http.response import HttpResponseBase + +from sentry import options +from sentry.api.exceptions import RequestTimeout +from sentry.objectstore.endpoints.organization import get_raw_body_async +from sentry.silo.util import ( + PROXY_APIGATEWAY_HEADER, + PROXY_DIRECT_LOCATION_HEADER, + clean_outbound_headers, + clean_proxy_headers, +) +from sentry.types.cell import ( + Cell, + CellResolutionError, + get_cell_by_name, + get_cell_for_organization, +) +from sentry.utils import metrics +from sentry.utils.http import BodyAsyncWrapper + +from .circuitbreaker import ( + CircuitBreakerManager, + CircuitBreakerOverflow, + CircuitBreakerWindowOverflow, +) + +logger = logging.getLogger(__name__) + +proxy_client = httpx.AsyncClient() +circuitbreakers = CircuitBreakerManager() + +# Endpoints that handle uploaded files have higher timeouts configured +# and we need to honor those timeouts when proxying. +# See frontend/templates/sites-enabled/sentry.io in getsentry/ops +ENDPOINT_TIMEOUT_OVERRIDE = { + "sentry-api-0-chunk-upload": 90.0, + "sentry-api-0-organization-release-files": 90.0, + "sentry-api-0-project-release-files": 90.0, + "sentry-api-0-dsym-files": 90.0, + "sentry-api-0-installable-preprod-artifact-download": 90.0, + "sentry-api-0-project-preprod-artifact-download": 90.0, + "sentry-api-0-organization-preprod-artifact-size-analysis-download": 90.0, + "sentry-api-0-organization-objectstore": 90.0, +} + +# stream 0.5 MB at a time +PROXY_CHUNK_SIZE = 512 * 1024 + + +async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]: + """Yield chunks from an httpx response and close the connection when done.""" + try: + async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE): + yield chunk + finally: + await response.aclose() + + +def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: + """Convert an httpx Response into a Django response.""" + + if content_type := response.headers.get("Content-Type", None): + del response.headers["Content-Type"] + new_headers = clean_outbound_headers(response.headers) + + streamed_response = StreamingHttpResponse( + streaming_content=_stream_response_and_close(response), + status=response.status_code, + content_type=content_type, + ) + + for header, value in new_headers.items(): + streamed_response[header] = value + + streamed_response[PROXY_DIRECT_LOCATION_HEADER] = remote_url + return streamed_response + + +async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]: + async for chunk in body: + yield chunk + + +async def proxy_request( + request: HttpRequest, + org_id_or_slug: str, + url_name: str, +) -> HttpResponseBase: + """Take a django request object and proxy it to a remote location given an org_id_or_slug""" + + try: + cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug) + except CellResolutionError as e: + logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)}) + return HttpResponse(status=404) + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_error_embed_request( + request: HttpRequest, dsn: str, url_name: str +) -> HttpResponseBase | None: + try: + parsed = urlparse(dsn) + except Exception as err: + logger.info("apigateway.error_embed.invalid_dsn", extra={"dsn": dsn, "error": err}) + return None + host = parsed.netloc + app_host = urlparse(options.get("system.url-prefix")).netloc + if not host.endswith(app_host): + # Don't further parse URLs that aren't for us. + return None + + app_segments = app_host.split(".") + host_segments = host.split(".") + if len(host_segments) - len(app_segments) < 3: + # If we don't have a o123.ingest.{cell}.{app_host} style domain + # we forward to the monolith cell + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + return await proxy_cell_request(request, cell, url_name) + try: + cell_offset = len(app_segments) + 1 + cell_segment = host_segments[cell_offset * -1] + cell = get_cell_by_name(cell_segment) + except Exception: + return None + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_cell_request( + request: HttpRequest, + cell: Cell, + url_name: str, +) -> HttpResponseBase: + """Take a django request object and proxy it to a cell silo""" + metric_tags = {"region": cell.name, "url_name": url_name} + target_url = urljoin(cell.address, request.path) + + content_encoding = request.headers.get("Content-Encoding") + content_length = request.headers.get("Content-Length") + header_dict = clean_proxy_headers(request.headers) + header_dict[PROXY_APIGATEWAY_HEADER] = "true" + + assert request.method is not None + query_params = request.GET + + timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) + + # XXX: See sentry.testutils.pytest.sentry for more information + if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): + return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) + + try: + async with circuitbreakers.get(cell.name) as circuitbreaker: + if url_name == "sentry-api-0-organization-objectstore": + if content_encoding: + header_dict["Content-Encoding"] = content_encoding + data = get_raw_body_async(request) + else: + data = BodyAsyncWrapper(request.body) + # With request streaming, and without `Content-Length` header, + # `httpx` will set chunked transfer encoding. + # Upstream doesn't necessarily support this, + # thus we re-add the header if it was present in the original request. + if content_length: + header_dict["Content-Length"] = content_length + + try: + with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): + req = proxy_client.build_request( + request.method, + target_url, + headers=header_dict, + params=dict(query_params) if query_params is not None else None, + content=_stream_request(data) if data else None, # type: ignore[arg-type] + timeout=timeout, + ) + resp = await proxy_client.send(req, stream=True, follow_redirects=False) + if resp.status_code >= 502: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + return _adapt_response(resp, target_url) + except (httpx.TimeoutException, asyncio.CancelledError): + metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) + circuitbreaker.incr_failures() + # remote silo timeout. Use DRF timeout instead + raise RequestTimeout() + except httpx.RequestError: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + raise + except CircuitBreakerOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.overflow", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Too many requests"}, + status=429, + ) + except CircuitBreakerWindowOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Downstream service temporarily unavailable"}, + status=503, + ) diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index c6ed04945011..5bb22190d1a3 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -6,12 +6,13 @@ from wsgiref.util import is_hop_by_hop import requests +from asgiref.sync import sync_to_async from django.http import HttpRequest, StreamingHttpResponse from requests import Response as ExternalResponse from rest_framework.request import Request from rest_framework.response import Response -from sentry.utils.http import BodyWithLength +from sentry.utils.http import BodyAsyncWrapper, BodyWithLength # TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian uwsgi: Any = None @@ -150,6 +151,32 @@ def stream_generator(): return BodyWithLength(request) +def get_raw_body_async( + request: HttpRequest, +) -> BodyAsyncWrapper | ChunkedEncodingAsyncDecoder | BodyWithLength: + if request.body: + return BodyAsyncWrapper(request.body) + + wsgi_input = request.META.get("wsgi.input") + if "granian" in request.META.get("SERVER_SOFTWARE", "").lower(): + return BodyAsyncWrapper(wsgi_input) + + # wsgiref will raise an exception and hang when attempting to read wsgi.input while there's no body. + # For now, support bodies only on PUT and POST requests when not using Granian. + if request.method not in ("PUT", "POST"): + return BodyAsyncWrapper(b"") + + # wsgiref (dev/test server) + if ( + hasattr(wsgi_input, "_read") + and request.headers.get("Transfer-Encoding", "").lower() == "chunked" + ): + return ChunkedEncodingAsyncDecoder(wsgi_input._read) # type: ignore[union-attr] + + # wsgiref and the request has been already proxied through control silo + return BodyWithLength(request) + + def get_target_url(path: str) -> str: base = options.get("objectstore.config")["base_url"].rstrip("/") # `path` should be a relative path, only grab that part @@ -248,3 +275,13 @@ def read(self, size: int = -1) -> bytes: raise ValueError("Malformed chunk encoded stream") return b"".join(buffer) + + +class ChunkedEncodingAsyncDecoder(ChunkedEncodingDecoder): + def __aiter__(self): + return self + + async def __anext__(self) -> bytes: + if self._done: + raise StopAsyncIteration + return await sync_to_async(self.read)() diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index ba9795f7c67f..6a1e8a526b85 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -471,9 +471,11 @@ def devserver( "SENTRY_SILO_DEVSERVER": "1", "SENTRY_SILO_MODE": "CONTROL", "SENTRY_REGION": "", + "SENTRY_APIGW_ASYNC": "true", "SENTRY_CONTROL_SILO_PORT": server_port, "SENTRY_REGION_SILO_PORT": str(ports["region.server"]), "SENTRY_DEVSERVER_BIND": f"127.0.0.1:{server_port}", + "SENTRY_GRANIAN_IFACE": "asginl", "SENTRY_GRANIAN_PORT": str(ports["server"]), "SENTRY_GRANIAN_WORKERS": "2", } diff --git a/src/sentry/services/http.py b/src/sentry/services/http.py index 9678945b00c4..f515a25a28a1 100644 --- a/src/sentry/services/http.py +++ b/src/sentry/services/http.py @@ -5,7 +5,6 @@ from typing import Any from granian import Granian -from granian.constants import Interfaces as GranianInterfaces from sentry.services.base import Service @@ -15,7 +14,7 @@ def _run_server(options: dict[str, Any]): target=options["module"], address=options["host"], port=options["port"], - interface=GranianInterfaces.WSGI, + interface=options["iface"], workers=options["workers"], backlog=options["backlog"], workers_kill_timeout=options["workers-kill-timeout"], @@ -51,6 +50,7 @@ def __init__( host = host or settings.SENTRY_WEB_HOST port = port or int(os.environ.get("SENTRY_GRANIAN_PORT", "0")) or settings.SENTRY_WEB_PORT workers = workers or int(os.environ.get("SENTRY_GRANIAN_WORKERS", "1")) + iface = os.environ.get("SENTRY_GRANIAN_IFACE", "wsgi") reload = bool(os.environ.get("SENTRY_GRANIAN_RELOAD")) options = (settings.SENTRY_WEB_OPTIONS or {}).copy() @@ -58,7 +58,8 @@ def __init__( for k, v in extra_options.items(): options[k] = v - options.setdefault("module", "sentry.wsgi:application") + options.setdefault("module", f"sentry.{iface[:4]}:application") + options.setdefault("iface", iface) options.setdefault("host", host) options.setdefault("port", port) options.setdefault("workers", workers) diff --git a/src/sentry/silo/base.py b/src/sentry/silo/base.py index 91dade5a9e59..cccbf81eadcb 100644 --- a/src/sentry/silo/base.py +++ b/src/sentry/silo/base.py @@ -3,7 +3,6 @@ import abc import contextlib import functools -import threading import typing from collections.abc import Callable, Generator, Iterable from enum import Enum @@ -56,7 +55,7 @@ def get_current_mode(cls) -> SiloMode: return SingleProcessSiloModeState.get_mode() or process_level_silo_mode -class SingleProcessSiloModeState(threading.local): +class SingleProcessSiloModeState: """ Used by silo endpoint decorators and other contexts that help 'suggest' to acceptance testing and local single process silo testing which 'silo context' the diff --git a/src/sentry/testutils/asserts.py b/src/sentry/testutils/asserts.py index e8c07edd4832..5b08264859f6 100644 --- a/src/sentry/testutils/asserts.py +++ b/src/sentry/testutils/asserts.py @@ -2,7 +2,6 @@ from django.db import models from django.db.models.functions import Cast -from django.http import StreamingHttpResponse from sentry.constants import ObjectStatus from sentry.integrations.types import EventLifecycleOutcome @@ -44,10 +43,7 @@ def assert_commit_shape(commit): def assert_status_code(response, minimum: int, maximum: int | None = None): # Omit max to assert status_code == minimum. maximum = maximum or minimum + 1 - assert minimum <= response.status_code < maximum, ( - response.status_code, - response.getvalue() if isinstance(response, StreamingHttpResponse) else response.content, - ) + assert minimum <= response.status_code < maximum, response def assert_existing_projects_status( diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index a1202f16f7f4..486513a7ca3e 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -17,6 +17,7 @@ from uuid import UUID, uuid4 from zlib import compress +import httpx import pytest import requests import responses @@ -36,7 +37,6 @@ from django.utils import timezone from django.utils.functional import cached_property from google.protobuf.timestamp_pb2 import Timestamp -from requests.utils import CaseInsensitiveDict, get_encoding_from_headers from rest_framework import status from rest_framework.request import Request from rest_framework.response import Response @@ -678,36 +678,45 @@ def get_cursor_headers(self, response): def api_gateway_proxy_stubbed(self): """Mocks a fake api gateway proxy that redirects via Client objects""" - def proxy_raw_request( - method: str, - url: str, - headers: Mapping[str, str], - params: Mapping[str, str] | None, - data: Any, - **kwds: Any, - ) -> requests.Response: - from django.test.client import Client - - client = Client() - extra: Mapping[str, Any] = { - f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() - } - if params: - url += "?" + urlencode(params) - with assume_test_silo_mode(SiloMode.CELL): - resp = getattr(client, method.lower())( - url, b"".join(data), headers["Content-Type"], **extra - ) - response = requests.Response() - response.status_code = resp.status_code - response.headers = CaseInsensitiveDict(resp.headers) - response.encoding = get_encoding_from_headers(response.headers) - response.raw = BytesIO(resp.content) - return response + from asgiref.sync import sync_to_async + from django.test.client import Client + + class MockedProxy: + def __init__(self): + self.client = Client() + + @staticmethod + async def _consume_body(content): + ret = b"" + async for chunk in content: + ret += chunk + return ret + + def build_request(self, method, url, headers, params, content, timeout): + assert not params + target = getattr(self.client, method.lower()) + content_type = headers.pop("Content-Type", "application/octet-stream") + extra: Mapping[str, Any] = { + f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() + } + return target, (url, content, content_type), extra + + async def send(self, req, stream, follow_redirects): + with assume_test_silo_mode(SiloMode.CELL): + url, content, content_type = req[1] + content = await self._consume_body(content) + resp = await sync_to_async(req[0])(url, content, content_type, **req[2]) + wresp = httpx.Response( + status_code=resp.status_code, + headers=dict(resp.headers), + content=resp.content, + ) + return wresp + mock_client = MockedProxy() with mock.patch( - "sentry.hybridcloud.apigateway.proxy.external_request", - new=proxy_raw_request, + "sentry.hybridcloud.apigateway_async.proxy.proxy_client", + new=mock_client, ): yield diff --git a/src/sentry/testutils/helpers/apigateway.py b/src/sentry/testutils/helpers/apigateway.py index 08e3c4ac4bf6..bc3cde3f74f5 100644 --- a/src/sentry/testutils/helpers/apigateway.py +++ b/src/sentry/testutils/helpers/apigateway.py @@ -1,8 +1,12 @@ from __future__ import annotations +from collections.abc import Callable +from contextlib import contextmanager +from typing import Any +from unittest.mock import patch from urllib.parse import parse_qs -import responses +import httpx from django.conf import settings from django.http import HttpResponseRedirect from django.test import override_settings @@ -75,14 +79,98 @@ def get(self, request: Request) -> Response: ] + api_urls.urlpatterns +# Type for httpx mock callback: receives httpx.Request, returns (status, headers, body) +HttpxCallback = Callable[[httpx.Request], tuple[int, dict[str, str], str | bytes]] + + +class HttpxMockRouter: + """Mock HTTP router for httpx, replacing the `responses` library for async proxy tests.""" + + def __init__(self) -> None: + self._routes: list[dict[str, Any]] = [] + + def add( + self, + method: str, + url: str, + body: str | bytes = b"", + status_code: int = 200, + headers: dict[str, str] | None = None, + json_data: Any | None = None, + content_type: str | None = None, + ) -> None: + if json_data is not None: + body = json.dumps(json_data).encode() + content_type = content_type or "application/json" + elif isinstance(body, str): + body = body.encode() + resp_headers = dict(headers or {}) + if content_type: + resp_headers["Content-Type"] = content_type + self._routes.append( + { + "method": method.upper(), + "url": url, + "body": body, + "status_code": status_code, + "headers": resp_headers, + } + ) + + def add_callback(self, method: str, url: str, callback: HttpxCallback) -> None: + self._routes.append( + { + "method": method.upper(), + "url": url, + "callback": callback, + } + ) + + def handler(self, request: httpx.Request) -> httpx.Response: + url_str = str(request.url) + # Strip query params for matching + url_path = url_str.split("?")[0] + for route in self._routes: + if request.method != route["method"]: + continue + route_url = route["url"].split("?")[0] + if url_path != route_url: + continue + + if "callback" in route: + status_code, headers, body = route["callback"](request) + if isinstance(body, str): + body = body.encode() + return httpx.Response( + status_code, headers=dict(headers), content=body, request=request + ) + else: + return httpx.Response( + route["status_code"], + headers=route["headers"], + content=route["body"], + request=request, + ) + + raise ValueError(f"No mock route matched: {request.method} {url_str}") + + +@contextmanager +def mock_proxy_client(router: HttpxMockRouter): + """Patch the proxy_client with a mock httpx.AsyncClient using the given router.""" + mock_client = httpx.AsyncClient(transport=httpx.MockTransport(router.handler)) + with patch("sentry.hybridcloud.apigateway_async.proxy.proxy_client", mock_client): + yield mock_client + + def verify_request_body(body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): if request.headers.get("content-type") == "application/json": - assert json.load(request.body) == body + assert json.loads(request.content) == body else: - assert request.body.read() == body + assert request.content == (body if isinstance(body, bytes) else body.encode()) assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -90,9 +178,9 @@ def request_callback(request): def verify_request_headers(headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -100,10 +188,10 @@ def request_callback(request): def verify_request_params(params, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - request_params = parse_qs(request.url.split("?")[1]) + def request_callback(request: httpx.Request): + request_params = parse_qs(str(request.url).split("?")[1]) assert (request.headers[key] == headers[key] for key in headers) for key in params: assert key in request_params @@ -117,10 +205,10 @@ def request_callback(request): def verify_file_body(file_body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - assert file_body in request.body.read() + def request_callback(request: httpx.Request): + assert file_body in request.content assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -129,8 +217,10 @@ def request_callback(request): def provision_middleware(): middleware = list(settings.MIDDLEWARE) - if "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" not in middleware: - middleware = ["sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware"] + middleware + if "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" not in middleware: + middleware = [ + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + ] + middleware return middleware @@ -148,34 +238,42 @@ class ApiGatewayTestCase(APITestCase): def setUp(self): super().setUp() - responses.add( - responses.GET, + self.httpx_router = HttpxMockRouter() + self.httpx_router.add( + "GET", f"{self.CELL.address}/get", body=json.dumps({"proxy": True}), content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/error", body=json.dumps({"proxy": True}), - status=400, + status_code=400, content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) self.organization = self.create_organization(region=self.CELL) # Echos the request body and header back for verification - def return_request_body(request): - return (200, request.headers, request.body) + def return_request_body(request: httpx.Request): + return (200, dict(request.headers), request.content) # Echos the query params and header back for verification - def return_request_params(request): - params = parse_qs(request.url.split("?")[1]) - return (200, request.headers, json.dumps(params).encode()) + def return_request_params(request: httpx.Request): + params = parse_qs(str(request.url).split("?")[1]) + return (200, dict(request.headers), json.dumps(params).encode()) - responses.add_callback(responses.GET, f"{self.CELL.address}/echo", return_request_params) - responses.add_callback(responses.POST, f"{self.CELL.address}/echo", return_request_body) + self.httpx_router.add_callback("GET", f"{self.CELL.address}/echo", return_request_params) + self.httpx_router.add_callback("POST", f"{self.CELL.address}/echo", return_request_body) self.middleware = provision_middleware() + # Enter the mock proxy client context for the duration of the test + self._mock_proxy_ctx = mock_proxy_client(self.httpx_router) + self._mock_proxy_ctx.__enter__() + + def tearDown(self): + self._mock_proxy_ctx.__exit__(None, None, None) + super().tearDown() diff --git a/src/sentry/testutils/helpers/response.py b/src/sentry/testutils/helpers/response.py index d6681e93c6f8..87d359c443bc 100644 --- a/src/sentry/testutils/helpers/response.py +++ b/src/sentry/testutils/helpers/response.py @@ -2,6 +2,7 @@ from typing import TypeGuard +from asgiref.sync import async_to_sync from django.http.response import HttpResponseBase, StreamingHttpResponse from rest_framework.response import Response @@ -16,11 +17,23 @@ def is_streaming_response(response: HttpResponseBase) -> TypeGuard[StreamingHttp return isinstance(response, StreamingHttpResponse) +async def _async_streaming_response_content(response: StreamingHttpResponse) -> bytes: + data = [] + async for chunk in response: + data.append(chunk) + iterator = response._iterator # type: ignore[attr-defined] + if hasattr(iterator, "aclose"): + await iterator.aclose() + return b"".join(data) + + def close_streaming_response(response: HttpResponseBase) -> bytes: """Exhausts the streamed file in a response. - When the file is exahusted, this underlying file descriptor is closed + When the file is exhausted, the underlying file descriptor is closed avoiding a `ResourceWarning`. """ assert isinstance(response, StreamingHttpResponse) + if response.is_async: + return async_to_sync(_async_streaming_response_content)(response) return response.getvalue() diff --git a/src/sentry/testutils/silo.py b/src/sentry/testutils/silo.py index 98a1be33456b..74a79df4be2b 100644 --- a/src/sentry/testutils/silo.py +++ b/src/sentry/testutils/silo.py @@ -1,12 +1,12 @@ from __future__ import annotations import contextlib +import contextvars import functools import inspect import os import re import sys -import threading import typing from collections.abc import Callable, Collection, Generator, Iterable, Mapping, MutableSet, Sequence from contextlib import contextmanager, nullcontext @@ -35,46 +35,43 @@ def monkey_patch_single_process_silo_mode_state(): - class LocalSiloModeState(threading.local): - mode: SiloMode | None = None - cell: Cell | None = None - - state = LocalSiloModeState() + _silo_mode_var: contextvars.ContextVar[SiloMode | None] = contextvars.ContextVar( + "silo_mode", default=None + ) + _silo_cell_var: contextvars.ContextVar[Cell | None] = contextvars.ContextVar( + "silo_cell", default=None + ) @contextlib.contextmanager def enter(mode: SiloMode, cell: Cell | None = None) -> Generator[None]: - assert state.mode is None, ( + assert _silo_mode_var.get() is None, ( "Re-entrant invariant broken! Use exit_single_process_silo_context " "to explicit pass 'fake' RPC boundaries." ) - old_mode = state.mode - old_cell = state.cell - state.mode = mode - state.cell = cell + mode_token = _silo_mode_var.set(mode) + cell_token = _silo_cell_var.set(cell) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) @contextlib.contextmanager def exit() -> Generator[None]: - old_mode = state.mode - old_cell = state.cell - state.mode = None - state.cell = None + mode_token = _silo_mode_var.set(None) + cell_token = _silo_cell_var.set(None) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) def get_mode() -> SiloMode | None: - return state.mode + return _silo_mode_var.get() def get_cell() -> Cell | None: - return state.cell + return _silo_cell_var.get() SingleProcessSiloModeState.enter = staticmethod(enter) # type: ignore[method-assign] SingleProcessSiloModeState.exit = staticmethod(exit) # type: ignore[method-assign] diff --git a/src/sentry/utils/http.py b/src/sentry/utils/http.py index 8cf3f48dca91..53d15211b2d8 100644 --- a/src/sentry/utils/http.py +++ b/src/sentry/utils/http.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections.abc import Collection, Iterator -from typing import TYPE_CHECKING, NamedTuple, TypeGuard, overload +from typing import TYPE_CHECKING, Any, NamedTuple, TypeGuard, overload from urllib.parse import quote, urljoin, urlparse +from asgiref.sync import sync_to_async from django.conf import settings from django.http import HttpRequest @@ -226,6 +227,32 @@ def is_using_customer_domain(request: HttpRequest) -> TypeGuard[_HttpRequestWith return bool(hasattr(request, "subdomain") and request.subdomain) +class BodyAsyncWrapper: + def __init__(self, body: Any): + self._bool = bool(body) + self.body = [body] if isinstance(body, bytes) else body + + def __bool__(self) -> bool: + return self._bool + + def __aiter__(self): + return BodyAsyncIter(self) + + +class BodyAsyncIter: + def __init__(self, parent: BodyAsyncWrapper): + self.biter = iter(parent.body) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() + + class BodyWithLength: """Wraps an HttpRequest with a __len__ so that the requests library does not assume length=0 in all cases""" @@ -235,8 +262,25 @@ def __init__(self, request: HttpRequest): def __iter__(self) -> Iterator[bytes]: return iter(self.request) + def __aiter__(self) -> BodyWithLengthAiter: + return BodyWithLengthAiter(self) + def __len__(self) -> int: return int(self.request.headers.get("Content-Length", "0")) def read(self, size: int | None = None) -> bytes: return self.request.read(size) + + +class BodyWithLengthAiter: + def __init__(self, parent: BodyWithLength): + self.biter = iter(parent.request) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() diff --git a/src/sentry/utils/sdk.py b/src/sentry/utils/sdk.py index a71e5ba4c11f..04d7916bdcb0 100644 --- a/src/sentry/utils/sdk.py +++ b/src/sentry/utils/sdk.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import copy import logging import sys @@ -268,6 +269,15 @@ def before_send_log(log: Log, _: Hint) -> Log | None: if attributes.get("sentry.message.template") == "New partitions assigned: %r": return None + try: + # FIXME: when in ASGI, the call to `options.store` from `in_random_rollout` + # would fail, because of SyncOnlyOperation. + # While we should ideally figure out how to actually fix this, + # for the moment let's just simplify and skip this entirely. + asyncio.get_running_loop() + return None + except Exception: + pass if in_random_rollout("ourlogs.sentry-emit-rollout"): return log return None diff --git a/tests/acceptance/test_proxy.py b/tests/acceptance/test_proxy.py index b49285d62305..98c8a39871bd 100644 --- a/tests/acceptance/test_proxy.py +++ b/tests/acceptance/test_proxy.py @@ -17,6 +17,7 @@ from sentry.testutils.cases import TransactionTestCase from sentry.testutils.cell import override_cells from sentry.testutils.factories import Factories +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import cell_silo_test from sentry.types.cell import Cell from sentry.utils import json @@ -69,6 +70,6 @@ def test_through_api_gateway(self) -> None: ) assert_status_code(resp, 201) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) team = Team.objects.get(id=result["id"]) assert team.idp_provisioned diff --git a/tests/conftest.py b/tests/conftest.py index 4f47d7f85ccf..efb241a151aa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,9 @@ import responses import sentry_sdk +# Set async apigateway as soon as possible +os.environ["SENTRY_APIGW_ASYNC"] = "true" + # Disable crash recovery server in pytest-rerunfailures. Under xdist, Sentry's # global socket.setdefaulttimeout(5) causes the server's per-worker recv threads # to die during Django init (~10s), silently breaking crash recovery anyway. diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway.py b/tests/sentry/hybridcloud/apigateway/test_apigateway.py index 723305c5ab85..f59eedc88d2f 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway.py @@ -1,7 +1,6 @@ from urllib.parse import urlencode import pytest -import responses from django.conf import settings from django.test import override_settings from django.urls import get_resolver, reverse @@ -16,12 +15,11 @@ @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ApiGatewayTest(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: query_params = dict(foo="test", bar=["one", "two"]) headers = dict(example="this") - responses.add_callback( - responses.GET, + self.httpx_router.add_callback( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", verify_request_params(query_params, headers), ) @@ -40,13 +38,12 @@ def test_simple(self) -> None: resp_json = json.loads(close_streaming_response(resp)) assert resp_json["proxy"] is True - @responses.activate def test_proxy_does_not_resolve_redirect(self) -> None: - responses.add( - responses.POST, + self.httpx_router.add( + "POST", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", headers={"Location": "https://zombo.com"}, - status=302, + status_code=302, ) url = reverse("region-endpoint", kwargs={"organization_slug": self.organization.slug}) @@ -61,7 +58,6 @@ def test_proxy_does_not_resolve_redirect(self) -> None: response_payload = close_streaming_response(resp) assert response_payload == b"" - @responses.activate def test_cell_pinned_urls_are_defined(self) -> None: resolver = get_resolver() # Ensure that all urls in REGION_PINNED_URL_NAMES exist in api/urls.py @@ -73,18 +69,17 @@ def test_cell_pinned_urls_are_defined(self) -> None: f"REGION_PINNED_URL_NAMES contains {name}, but no route is registered with that name" ) - @responses.activate def test_proxy_check_org_slug_url(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url = reverse( @@ -109,28 +104,27 @@ def test_proxy_check_org_slug_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url_slug = reverse( @@ -178,13 +172,12 @@ def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_region_pinned_url(self) -> None: project_key = self.create_project_key(self.project) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/js-sdk-loader/{project_key.public_key}.js", - json={"proxy": True}, + json_data={"proxy": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -205,17 +198,16 @@ def test_proxy_check_region_pinned_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_cell_pinned_url_with_params(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/register/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/abc123/", - json={"proxy": True, "details": True}, + json_data={"proxy": True, "details": True}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): @@ -230,18 +222,17 @@ def test_proxy_check_cell_pinned_url_with_params(self) -> None: assert resp_json["proxy"] is True assert resp_json["details"] is True - @responses.activate def test_proxy_check_cell_pinned_issue_urls(self) -> None: issue = self.create_group() - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/", - json={"proxy": True, "id": issue.id}, + json_data={"proxy": True, "id": issue.id}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/events/", - json={"proxy": True, "id": issue.id, "events": True}, + json_data={"proxy": True, "id": issue.id, "events": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -262,12 +253,11 @@ def test_proxy_check_cell_pinned_issue_urls(self) -> None: assert resp_json["proxy"] is True assert resp_json["events"] - @responses.activate def test_proxy_error_embed_dsn(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/api/embed/error-page/", - json={"proxy": True, "name": "error-embed"}, + json_data={"proxy": True, "name": "error-embed"}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): # no dsn diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py index f07005d6d737..2064d9863d82 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py @@ -1,7 +1,4 @@ -from io import BytesIO - -import requests -import responses +import httpx from sentry.testutils.helpers.apigateway import ApiGatewayTestCase, verify_request_body from sentry.testutils.silo import no_silo_test @@ -10,14 +7,16 @@ @no_silo_test(cells=[ApiGatewayTestCase.CELL]) class VerifyRequestBodyTest(ApiGatewayTestCase): - @responses.activate def test_verify_request_body(self) -> None: body = {"ab": "cd"} headers = {"header": "nope", "content-type": "application/json"} - responses.add_callback( - responses.POST, "http://ab.cd.e/test", verify_request_body(body, headers) - ) - resp = requests.post( - "http://ab.cd.e/test", data=BytesIO(json.dumps(body).encode("utf8")), headers=headers + callback = verify_request_body(body, headers) + + mock_request = httpx.Request( + "POST", + "http://ab.cd.e/test", + headers=headers, + content=json.dumps(body).encode("utf8"), ) - assert resp.status_code == 200 + status_code, resp_headers, resp_body = callback(mock_request) + assert status_code == 200 diff --git a/tests/sentry/hybridcloud/apigateway/test_proxy.py b/tests/sentry/hybridcloud/apigateway/test_proxy.py index c8a66042dfc7..68bbabf46459 100644 --- a/tests/sentry/hybridcloud/apigateway/test_proxy.py +++ b/tests/sentry/hybridcloud/apigateway/test_proxy.py @@ -1,22 +1,16 @@ -from unittest.mock import MagicMock, patch from urllib.parse import urlencode -import pytest -import requests -import responses +import httpx +from asgiref.sync import async_to_sync from django.core.files.uploadedfile import SimpleUploadedFile -from django.http import HttpResponse from django.test.client import RequestFactory -from requests.exceptions import ConnectionError, Timeout -from sentry.api.exceptions import RequestTimeout -from sentry.hybridcloud.apigateway.proxy import proxy_request +from sentry.hybridcloud.apigateway_async.proxy import proxy_request as _proxy_request from sentry.silo.util import ( INVALID_OUTBOUND_HEADERS, PROXY_APIGATEWAY_HEADER, PROXY_DIRECT_LOCATION_HEADER, ) -from sentry.testutils.helpers import override_options from sentry.testutils.helpers.apigateway import ( ApiGatewayTestCase, verify_file_body, @@ -27,12 +21,12 @@ from sentry.testutils.silo import control_silo_test from sentry.utils import json +proxy_request = async_to_sync(_proxy_request) url_name = "sentry-api-0-projets" @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ProxyTestCase(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, self.organization.slug, url_name) @@ -54,7 +48,6 @@ def test_simple(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/error" - @responses.activate def test_query_params(self) -> None: query_param_dict = dict(foo="bar", numlist=["1", "2", "3"]) query_param_str = urlencode(query_param_dict, doseq=True) @@ -68,17 +61,15 @@ def test_query_params(self) -> None: assert query_param_dict["foo"] == resp_json["foo"][0] assert query_param_dict["numlist"] == resp_json["numlist"] - @responses.activate def test_bad_org(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, "doesnotexist", url_name) assert resp.status_code == 404 - @responses.activate def test_post(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -94,11 +85,10 @@ def test_post(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/post" - @responses.activate def test_put(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PUT, + self.httpx_router.add_callback( + "PUT", "http://us.internal.sentry.io/put", verify_request_body(request_body, {"test": "header"}), ) @@ -114,11 +104,10 @@ def test_put(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/put" - @responses.activate def test_patch(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PATCH, + self.httpx_router.add_callback( + "PATCH", "http://us.internal.sentry.io/patch", verify_request_body(request_body, {"test": "header"}), ) @@ -134,11 +123,10 @@ def test_patch(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/patch" - @responses.activate def test_head(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.HEAD, + self.httpx_router.add_callback( + "HEAD", "http://us.internal.sentry.io/head", verify_request_headers({"test": "header"}), ) @@ -154,11 +142,10 @@ def test_head(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/head" - @responses.activate def test_delete(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.DELETE, + self.httpx_router.add_callback( + "DELETE", "http://us.internal.sentry.io/delete", verify_request_body(request_body, {"test": "header"}), ) @@ -174,7 +161,6 @@ def test_delete(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/delete" - @responses.activate def test_file_upload(self) -> None: foo = dict(test="a", file="b", what="c") contents = json.dumps(foo).encode() @@ -183,8 +169,8 @@ def test_file_upload(self) -> None: "foo": "bar", } - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_file_body(contents, {"test": "header"}), ) @@ -197,14 +183,13 @@ def test_file_upload(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_alternate_content_type(self) -> None: # Check form encoded files also work foo = dict(test="a", file="b", what="c") contents = urlencode(foo, doseq=True).encode("utf-8") request_body = contents - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(contents, {"test": "header"}), ) @@ -219,16 +204,15 @@ def test_alternate_content_type(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_apply_apigateway_proxy_header(self) -> None: - def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, str], str]: + def request_callback(request: httpx.Request) -> tuple[int, dict[str, str], str]: assert request.headers.get(PROXY_APIGATEWAY_HEADER), ( "Proxied requests should have a header added" ) return 200, {"proxied": "yes"}, json.dumps({"success": True}) - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", request_callback, ) @@ -243,11 +227,10 @@ def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, assert resp.status_code == 200 assert resp["proxied"] == "yes" - @responses.activate def test_strip_request_headers(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -267,176 +250,3 @@ def test_strip_request_headers(self) -> None: resp = proxy_request(request, self.organization.slug, url_name) assert not any([header in resp for header in INVALID_OUTBOUND_HEADERS]) - - -CB_ENABLED = { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": True, -} - - -@control_silo_test(cells=[ApiGatewayTestCase.CELL]) -class ProxyCircuitBreakerTestCase(ApiGatewayTestCase): - def _make_breaker_mock(self, *, allow_request: bool) -> MagicMock: - mock_breaker = MagicMock() - mock_breaker.should_allow_request.return_value = allow_request - return mock_breaker - - @responses.activate - @override_options(CB_ENABLED) - def test_open_circuit_returns_503(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert isinstance(resp, HttpResponse) - assert resp.status_code == 503 - assert json.loads(resp.content) == { - "error": "apigateway", - "detail": "Downstream service temporarily unavailable", - } - - @responses.activate - @override_options(CB_ENABLED) - def test_circuit_breaker_keyed_per_cell(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - key_used = mock_breaker_class.call_args.kwargs["key"] - assert key_used == f"apigateway.proxy.{self.CELL.name}" - - @responses.activate - def test_circuit_breaker_disabled_by_default(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - mock_breaker_class.assert_not_called() - - @responses.activate - @override_options( - { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": False, - } - ) - def test_open_circuit_not_enforced(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - - @responses.activate - @override_options({"apigateway.proxy.circuit-breaker.config": "invalid-lol", **CB_ENABLED}) - def test_handles_invalid_config(self) -> None: - request = RequestFactory().get("http://sentry.io/get") - res = proxy_request(request, self.organization.slug, url_name) - assert res.status_code == 200 - - @responses.activate - @override_options(CB_ENABLED) - def test_timeout_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/timeout", - body=Timeout(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/timeout") - with pytest.raises(RequestTimeout): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_metric(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.metrics") as mock_metrics: - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_metrics.incr.assert_any_call( - "apigateway.proxy.connection_error", - tags={"region": self.CELL.name, "url_name": url_name}, - ) - - @responses.activate - @override_options(CB_ENABLED) - def test_504_response_does_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=504, - body=json.dumps({"detail": "gateway timeout"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 504 - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_500_response_does_not_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=500, - body=json.dumps({"detail": "internal server error"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 500 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_4xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 400 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_2xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - mock_breaker.record_error.assert_not_called() diff --git a/tests/sentry/middleware/test_proxy.py b/tests/sentry/middleware/test_proxy.py index 09043bbe2b58..57406f423b0d 100644 --- a/tests/sentry/middleware/test_proxy.py +++ b/tests/sentry/middleware/test_proxy.py @@ -1,6 +1,8 @@ from __future__ import annotations +import asyncio from functools import cached_property +from unittest.mock import patch from django.http import HttpRequest @@ -8,6 +10,7 @@ from sentry.models.team import Team from sentry.silo.base import SiloMode from sentry.testutils.cases import APITestCase, TestCase +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import assume_test_silo_mode, control_silo_test from sentry.types.cell import Cell, RegionCategory from sentry.utils import json @@ -48,6 +51,29 @@ class FakedAPIProxyTest(APITestCase): endpoint = "sentry-api-0-organization-teams" method = "post" + def setUp(self) -> None: + super().setUp() + + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + def _process_view_match(self, request, view_func, view_args, view_kwargs): + try: + asyncio.get_running_loop() + return self._process_view_inner(request, view_func, view_args, view_kwargs) + except RuntimeError: + return self._process_view_sync(request, view_func, view_args, view_kwargs) + + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_match", _process_view_match + ) + self._middleware_patch.start() + + def tearDown(self) -> None: + self._middleware_patch.stop() + super().tearDown() + def test_through_api_gateway(self) -> None: if SiloMode.get_current_mode() == SiloMode.MONOLITH: return @@ -62,7 +88,7 @@ def test_through_api_gateway(self) -> None: status_code=201, ) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) with assume_test_silo_mode(SiloMode.CELL): team = Team.objects.get(id=result["id"]) assert team.idp_provisioned diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index fb5eecffda7e..d327fc4406c1 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -1,14 +1,18 @@ import os from dataclasses import asdict +from unittest.mock import patch +import httpx import pytest import requests import zstandard from django.db import connections +from django.http import HttpResponse, StreamingHttpResponse from django.urls import reverse from objectstore_client import Client, RequestError, Session, Usecase from pytest_django.live_server_helper import LiveServer +from sentry.hybridcloud.apigateway_async import proxy as proxy_mod from sentry.silo.base import SiloMode, SingleProcessSiloModeState from sentry.testutils.asserts import assert_status_code from sentry.testutils.cases import TransactionTestCase @@ -180,7 +184,53 @@ def setUp(self) -> None: scope_list=["project:releases"], ) + #: some shenanigans to work around async/sync hell: + # - use a "one shot" httpx client, so that we're not bound previous + # no-more existing event loops + # - patch the middleware to consume original streamed response body + # before the loop gets closed/destroyed + class HTTPXOneShotClient: + def __init__(self): + self.inner = None + + def __getattr__(self, name): + return getattr(self.inner, name) + + def build_request(self, *args, **kwargs): + self.inner = httpx.AsyncClient() + return self.inner.build_request(*args, **kwargs) + + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + async def _eager_process_view_inner(mw_self, request, view_func, view_args, view_kwargs): + resp = await _original_middleware(mw_self, request, view_func, view_args, view_kwargs) + if isinstance(resp, StreamingHttpResponse) and resp.is_async: + body = b"" + async for chunk in resp: + body += chunk + sync_resp = HttpResponse( + content=body, + status=resp.status_code, + content_type=resp.get("Content-Type"), + ) + for header, value in resp.items(): + if header.lower() != "content-type": + sync_resp[header] = value + return sync_resp + return resp + + self._apigateway_patch = patch.object(proxy_mod, "proxy_client", HTTPXOneShotClient()) + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_inner", _eager_process_view_inner + ) + self._apigateway_patch.start() + self._middleware_patch.start() + def tearDown(self) -> None: + self._middleware_patch.stop() + self._apigateway_patch.stop() for conn in connections.all(): conn.close() super().tearDown() @@ -205,8 +255,6 @@ def test_health(self) -> None: follow=True, ) assert response.status_code == 200 - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_full_cycle(self) -> None: config = asdict(test_region) @@ -225,7 +273,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -234,8 +282,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved_data = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved_data == b"test data" + assert response.content == b"test data" response = self.client.put( f"{base_url}{object_key}", @@ -245,7 +292,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - new_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + new_key = json.loads(response.content)["key"] assert new_key == object_key response = self.client.get( @@ -254,8 +301,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == b"new data" + assert response.content == b"new data" response = self.client.delete( f"{base_url}{object_key}", @@ -263,8 +309,6 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 204) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] response = self.client.get( f"{base_url}{object_key}", @@ -272,8 +316,6 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 404) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_roundtrip_compressed(self) -> None: config = asdict(test_region) @@ -297,7 +339,7 @@ def test_roundtrip_compressed(self) -> None: follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -306,5 +348,4 @@ def test_roundtrip_compressed(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == data + assert response.content == data diff --git a/uv.lock b/uv.lock index 0c70205b032d..041cffa6e8c2 100644 --- a/uv.lock +++ b/uv.lock @@ -796,6 +796,9 @@ pname = [ reload = [ { name = "watchfiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +uvloop = [ + { name = "uvloop", marker = "(platform_python_implementation == 'CPython' and sys_platform == 'darwin') or (platform_python_implementation == 'CPython' and sys_platform == 'linux')" }, +] [[package]] name = "grpc-google-iam-v1" @@ -949,17 +952,16 @@ http2 = [ [[package]] name = "httpx" -version = "0.25.2" +version = "0.28.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "httpcore", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "idna", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.25.2-py3-none-any.whl", hash = "sha256:a05d3d052d9b2dfce0e3896636467f8a5342fb2b902c819428e1ac65413ca118" }, + { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad" }, ] [[package]] @@ -2151,11 +2153,12 @@ dependencies = [ { name = "google-cloud-storage-transfer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-crc32c", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "googleapis-common-protos", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "granian", extra = ["pname", "reload"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "granian", extra = ["pname", "reload", "uvloop"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpc-google-iam-v1", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", version = "1.67.0", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux')" }, { name = "grpcio", version = "1.75.1", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version >= '3.14' and sys_platform == 'linux')" }, { name = "hiredis", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "iso3166", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "jsonschema", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "lxml", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2319,10 +2322,11 @@ requires-dist = [ { name = "google-cloud-storage-transfer", specifier = ">=1.17.0" }, { name = "google-crc32c", specifier = ">=1.6.0" }, { name = "googleapis-common-protos", specifier = ">=1.63.2" }, - { name = "granian", extras = ["pname", "reload"], specifier = ">=2.7" }, + { name = "granian", extras = ["pname", "reload", "uvloop"], specifier = ">=2.7" }, { name = "grpc-google-iam-v1", specifier = ">=0.13.1" }, { name = "grpcio", specifier = ">=1.67.0" }, { name = "hiredis", specifier = ">=2.3.2" }, + { name = "httpx", specifier = ">=0.28.1" }, { name = "iso3166", specifier = ">=2.1.1" }, { name = "jsonschema", specifier = ">=4.20.0" }, { name = "lxml", specifier = ">=5.3.0" }, @@ -3090,6 +3094,17 @@ socks = [ { name = "pysocks", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +[[package]] +name = "uvloop" +version = "0.21.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:bfd55dfcc2a512316e65f16e503e9e450cab148ef11df4e4e679b5e8253a5281" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:787ae31ad8a2856fc4e7c095341cccc7209bd657d0e71ad0dc2ea83c4a6fa8af" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5ee4d4ef48036ff6e5cfffb09dd192c7a5027153948d85b8da7ff705065bacc6" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3df876acd7ec037a3d005b3ab85a7e4110422e4d9c1571d4fc89b0fc41b6816" }, +] + [[package]] name = "virtualenv" version = "20.26.6" From fe3ee31541c6aea79e08faff45b2017c5bd0f12e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Dorfmeister=20=F0=9F=94=AE?= Date: Tue, 31 Mar 2026 11:33:51 +0200 Subject: [PATCH 4/4] docs(agents): Update API call examples to use TanStack Query with apiOptions (#111782) --- static/AGENTS.md | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/static/AGENTS.md b/static/AGENTS.md index 37374b51c11c..f5a441d194e8 100644 --- a/static/AGENTS.md +++ b/static/AGENTS.md @@ -35,18 +35,31 @@ ### Frontend API Calls +Prefer `apiOptions` with `useQuery` from TanStack Query for type-safe, consistent API calls: + ```typescript -import {useApiQuery, type UseApiQueryResult} from 'sentry/utils/queryClient'; +import {skipToken, useQuery} from '@tanstack/react-query'; +import {apiOptions} from 'sentry/utils/api/apiOptions'; + +// Basic usage +const query = useQuery( + apiOptions.as()('/organizations/$organizationIdOrSlug/endpoint/', { + path: {organizationIdOrSlug: organization.slug}, + staleTime: 30_000, + }) +); -const appSizeQuery: UseApiQueryResult = useApiQuery( - [`/projects/${organization.slug}/pull-requests/size-analysis/${selectedBuildId}/`], - { - staleTime: , // Optional, amount of time before data is considered stale (in ms) - enabled: , // Optional, whether the query is enabled - } +// Conditional fetching — pass skipToken as path to disable the query +const query = useQuery( + apiOptions.as()('/organizations/$organizationIdOrSlug/items/$itemId/', { + path: itemId ? {organizationIdOrSlug: organization.slug, itemId} : skipToken, + staleTime: 30_000, + }) ); ``` +Existing code might use `useApiQuery` from `sentry/utils/queryClient` — prefer `apiOptions` for new code. + ## General Frontend Rules 1. NO new Reflux stores