diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a2f08ede5ed774..fd5d107b0a33b7 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -35,6 +35,7 @@ /src/sentry/projectoptions/ @getsentry/app-backend /src/sentry/receivers/ @getsentry/app-backend /src/sentry/ratelimits/ @getsentry/app-backend +/src/sentry/asgi.py @getsentry/app-backend /tests/js/sentry-test/ @getsentry/app-frontend /static/app/utils/ @getsentry/app-frontend diff --git a/pyproject.toml b/pyproject.toml index cf052bb24e5359..60264b09ad04b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "google-cloud-storage-transfer>=1.17.0", "google-crc32c>=1.6.0", "googleapis-common-protos>=1.63.2", - "granian[pname,reload]>=2.7", + "granian[pname,reload,uvloop]>=2.7", "grpc-google-iam-v1>=0.13.1", # Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1 # See https://github.com/grpc/grpc/issues/23796 and @@ -45,6 +45,7 @@ dependencies = [ "grpcio>=1.67.0", # not directly used, but provides a speedup for redis "hiredis>=2.3.2", + "httpx>=0.28.1", "jsonschema>=4.20.0", "lxml>=5.3.0", "maxminddb>=2.3.0", diff --git a/src/sentry/api/serializers/models/project.py b/src/sentry/api/serializers/models/project.py index 5d3b8c7a1919b8..25b5cce08e9ebd 100644 --- a/src/sentry/api/serializers/models/project.py +++ b/src/sentry/api/serializers/models/project.py @@ -964,6 +964,7 @@ class DetailedProjectResponse(ProjectWithTeamResponseDict): tempestFetchScreenshots: NotRequired[bool] autofixAutomationTuning: NotRequired[str] seerScannerAutomation: NotRequired[bool] + scmSourceContextEnabled: NotRequired[bool] debugFilesRole: NotRequired[str | None] @@ -1114,6 +1115,9 @@ def serialize( attrs, "sentry:seer_scanner_automation" ), "debugFilesRole": attrs["options"].get("sentry:debug_files_role"), + "scmSourceContextEnabled": self.get_value_with_default( + attrs, "sentry:scm_source_context_enabled" + ), } if has_tempest_access(obj.organization): diff --git a/src/sentry/api/urls.py b/src/sentry/api/urls.py index 0515bc29aa4585..187c24d432cd55 100644 --- a/src/sentry/api/urls.py +++ b/src/sentry/api/urls.py @@ -334,6 +334,7 @@ ProjectGroupIndexEndpoint, ProjectGroupStatsEndpoint, ProjectStacktraceLinkEndpoint, + ProjectStacktraceSourceContextEndpoint, RelatedIssuesEndpoint, SharedGroupDetailsEndpoint, ShortIdLookupEndpoint, @@ -3266,6 +3267,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]: ProjectStacktraceLinkEndpoint.as_view(), name="sentry-api-0-project-stacktrace-link", ), + re_path( + r"^(?P[^/]+)/(?P[^/]+)/stacktrace-source-context/$", + ProjectStacktraceSourceContextEndpoint.as_view(), + name="sentry-api-0-project-stacktrace-source-context", + ), re_path( r"^(?P[^/]+)/(?P[^/]+)/repo-path-parsing/$", ProjectRepoPathParsingEndpoint.as_view(), diff --git a/src/sentry/apidocs/examples/project_examples.py b/src/sentry/apidocs/examples/project_examples.py index cbfe38e1490d8b..28ad1f7ccbd280 100644 --- a/src/sentry/apidocs/examples/project_examples.py +++ b/src/sentry/apidocs/examples/project_examples.py @@ -273,6 +273,7 @@ "highlightTags": [], "highlightContext": {}, "highlightPreset": {"tags": [], "context": {}}, + "scmSourceContextEnabled": False, } PROJECT_SUMMARY = { diff --git a/src/sentry/asgi.py b/src/sentry/asgi.py new file mode 100644 index 00000000000000..e27dba83252cb0 --- /dev/null +++ b/src/sentry/asgi.py @@ -0,0 +1,18 @@ +import os.path +import sys + +# Add the project to the python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) + +# Configure the application only if it seemingly isn't already configured +from django.conf import settings + +if not settings.configured: + from sentry.runner import configure + + configure() + +from django.core.handlers.asgi import ASGIHandler + +# Run ASGI handler for the application +application = ASGIHandler() diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7f448f1cc9ed36..02e2e9f64e8827 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -371,6 +371,12 @@ def env( # so that responses aren't modified after Content-Length is set, or have the # response modifying middleware reset the Content-Length header. # This is because CommonMiddleware Sets the Content-Length header for non-streaming responses. +APIGW_ASYNC = os.environ.get("SENTRY_APIGW_ASYNC", "").lower() in ("1", "true", "y", "yes") +APIGW_MIDDLEWARE = ( + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + if APIGW_ASYNC + else "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" +) MIDDLEWARE: tuple[str, ...] = ( "csp.middleware.CSPMiddleware", "sentry.middleware.health.HealthCheck", @@ -387,7 +393,7 @@ def env( "sentry.middleware.auth.AuthenticationMiddleware", "sentry.middleware.ai_agent.AIAgentMiddleware", "sentry.middleware.integrations.IntegrationControlMiddleware", - "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware", + APIGW_MIDDLEWARE, "sentry.middleware.demo_mode_guard.DemoModeGuardMiddleware", "sentry.middleware.customer_domain.CustomerDomainMiddleware", "sentry.middleware.sudo.SudoMiddleware", @@ -3181,6 +3187,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: } # Used in tests to skip forwarding relay paths to a region silo that does not exist. APIGATEWAY_PROXY_SKIP_RELAY = False +APIGATEWAY_PROXY_MAX_CONCURRENCY = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_CONCURRENCY", 100)) +APIGATEWAY_PROXY_MAX_FAILURES = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_FAILURES", 100)) +APIGATEWAY_PROXY_FAILURE_WINDOW = int(os.environ.get("SENTRY_APIGW_PROXY_FAILURE_WINDOW", 60)) # Shared resource ids for accounting EVENT_PROCESSING_STORE = "rc_processing_redis" diff --git a/src/sentry/core/endpoints/project_details.py b/src/sentry/core/endpoints/project_details.py index 94f73a7923f9f2..48d9bfb77b6f86 100644 --- a/src/sentry/core/endpoints/project_details.py +++ b/src/sentry/core/endpoints/project_details.py @@ -257,6 +257,10 @@ class ProjectAdminSerializer(ProjectMemberSerializer): targetSampleRate = serializers.FloatField(required=False, min_value=0, max_value=1) dynamicSamplingBiases = DynamicSamplingBiasSerializer(required=False, many=True) tempestFetchScreenshots = serializers.BooleanField(required=False) + scmSourceContextEnabled = serializers.BooleanField( + required=False, + help_text="Enable on-demand source context fetching from SCM integrations for stack traces.", + ) # DO NOT ADD MORE TO OPTIONS # Each param should be a field in the serializer like above. @@ -473,6 +477,15 @@ def validate_tempestFetchScreenshots(self, value): ) return value + def validate_scmSourceContextEnabled(self, value): + if value: + organization = self.context["project"].organization + if not features.has("organizations:scm-source-context", organization): + raise serializers.ValidationError( + "Organization does not have the SCM source context feature enabled." + ) + return value + def validate_debugFilesRole(self, value): if value is None: return value @@ -761,6 +774,13 @@ def put(self, request: Request, project) -> Response: changed_proj_settings["sentry:tempest_fetch_screenshots"] = result[ "tempestFetchScreenshots" ] + if result.get("scmSourceContextEnabled") is not None: + if project.update_option( + "sentry:scm_source_context_enabled", result["scmSourceContextEnabled"] + ): + changed_proj_settings["sentry:scm_source_context_enabled"] = result[ + "scmSourceContextEnabled" + ] if result.get("targetSampleRate") is not None: if project.update_option( "sentry:target_sample_rate", round(result["targetSampleRate"], 4) diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index 30d9f8e5afa01a..7d43ad0f34b8bc 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -137,6 +137,7 @@ def register_temporary_features(manager: FeatureManager) -> None: manager.add("organizations:integrations-github-copilot-agent", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-github-platform-detection", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-perforce", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) + manager.add("organizations:scm-source-context", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) # Project Management Integrations Feature Parity Flags manager.add("organizations:integrations-github_enterprise-project-management", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:integrations-gitlab-project-management", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) diff --git a/src/sentry/hybridcloud/apigateway_async/__init__.py b/src/sentry/hybridcloud/apigateway_async/__init__.py new file mode 100644 index 00000000000000..e4b8561f6bee7d --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/__init__.py @@ -0,0 +1,3 @@ +from .apigateway import proxy_request_if_needed + +__all__ = ("proxy_request_if_needed",) diff --git a/src/sentry/hybridcloud/apigateway_async/apigateway.py b/src/sentry/hybridcloud/apigateway_async/apigateway.py new file mode 100644 index 00000000000000..3e02e6613aee17 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/apigateway.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import Any + +from django.conf import settings +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from sentry.silo.base import SiloLimit, SiloMode +from sentry.types.cell import get_cell_by_name +from sentry.utils import metrics + +from .proxy import ( + proxy_cell_request, + proxy_error_embed_request, + proxy_request, +) + +logger = logging.getLogger(__name__) + + +def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset[SiloMode] | None: + view_class = getattr(view_func, "view_class", None) + if not view_class: + return None + if not hasattr(view_class, "silo_limit"): + return None + endpoint_silo_limit: SiloLimit = view_class.silo_limit + return endpoint_silo_limit.modes + + +async def proxy_request_if_needed( + request: Request, + view_func: Callable[..., HttpResponseBase], + view_kwargs: dict[str, Any], +) -> HttpResponseBase | None: + """ + Main execution flow for the API Gateway. + returns None if proxying is not required, or a response if the proxy was successful. + """ + current_silo_mode = SiloMode.get_current_mode() + if current_silo_mode != SiloMode.CONTROL: + return None + + silo_modes = _get_view_silo_mode(view_func) + if not silo_modes or current_silo_mode in silo_modes: + return None + + url_name = "unknown" + if request.resolver_match: + url_name = request.resolver_match.url_name or url_name + + if "organization_slug" in view_kwargs or "organization_id_or_slug" in view_kwargs: + org_id_or_slug = str( + view_kwargs.get("organization_slug") or view_kwargs.get("organization_id_or_slug", "") + ) + + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "orgslug", + }, + ) + return await proxy_request(request, org_id_or_slug, url_name) + + if url_name == "sentry-error-page-embed" and "dsn" in request.GET: + # Error embed modal is special as customers can't easily use cell URLs. + dsn = request.GET["dsn"] + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "error-embed", + }, + ) + return await proxy_error_embed_request(request, dsn, url_name) + + if ( + request.resolver_match + and request.resolver_match.url_name in settings.REGION_PINNED_URL_NAMES + ): + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "regionpin", + }, + ) + + return await proxy_cell_request(request, cell, url_name) + + if url_name != "unknown": + # If we know the URL but didn't proxy it record we could be missing + # URL handling and that needs to be fixed. + metrics.incr( + "apigateway.proxy_request", + tags={ + "kind": "noop", + "url_name": url_name, + }, + ) + logger.info("apigateway.unknown_url", extra={"url": request.path}) + + return None diff --git a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py new file mode 100644 index 00000000000000..e43500fccfa9e6 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import asyncio +import time +from collections import defaultdict +from typing import Any + +from django.conf import settings + + +class CircuitBreaker: + __slots__ = [ + "concurrency", + "counter_window", + "failures", + "semaphore", + "_clock", + "_counters", + "_counter_idx", + ] + + def __init__(self, concurrency: int, failures: tuple[int, int]) -> None: + self.concurrency = concurrency + self.counter_window = failures[0] + self.failures = failures[1] + self.semaphore = asyncio.Semaphore(self.concurrency) + self._clock = 0 + self._counters = [0, 0] + self._counter_idx = 0 + + def _counter_flip(self, clock: int) -> None: + self._clock = clock + prev = self._counter_idx + self._counter_idx = 1 - prev + self._counters[prev] = 0 + + def _maybe_counter_flip(self) -> None: + now = int(time.monotonic()) + delta = now - self._clock + if delta > 0: + if delta // self.counter_window: + self._counter_flip(now) + + def counter_incr(self) -> None: + self._maybe_counter_flip() + self._counters[self._counter_idx] += 1 + + def window_overflow(self) -> bool: + self._maybe_counter_flip() + return self._counters[self._counter_idx] > self.failures + + def overflow(self) -> bool: + return self.semaphore.locked() + + def ctx(self) -> CircuitBreakerCtx: + return CircuitBreakerCtx(self) + + +class CircuitBreakerOverflow(Exception): ... + + +class CircuitBreakerWindowOverflow(Exception): ... + + +class CircuitBreakerCtx: + __slots__ = ["cb"] + + def __init__(self, cb: CircuitBreaker): + self.cb = cb + + def incr_failures(self) -> None: + self.cb.counter_incr() + + async def __aenter__(self) -> CircuitBreakerCtx: + if self.cb.overflow(): + raise CircuitBreakerOverflow + await self.cb.semaphore.acquire() + if self.cb.window_overflow(): + self.cb.semaphore.release() + raise CircuitBreakerWindowOverflow + return self + + async def __aexit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None: + self.cb.semaphore.release() + + +class CircuitBreakerManager: + __slots__ = ["objs"] + + def __init__( + self, + max_concurrency: int | None = None, + failures: int | None = None, + failure_window: int | None = None, + ): + concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY + failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES + failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW + self.objs: dict[str, CircuitBreaker] = defaultdict( + lambda: CircuitBreaker(concurrency, (failure_window, failures)) + ) + + def get(self, key: str) -> CircuitBreakerCtx: + return self.objs[key].ctx() diff --git a/src/sentry/hybridcloud/apigateway_async/middleware.py b/src/sentry/hybridcloud/apigateway_async/middleware.py new file mode 100644 index 00000000000000..446210ee1836d2 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/middleware.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from typing import Any + +from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from . import proxy_request_if_needed + + +class ApiGatewayMiddleware: + """Proxy requests intended for remote silos""" + + async_capable = True + sync_capable = True + + def __init__(self, get_response: Callable[[Request], HttpResponseBase]): + self.get_response = get_response + if iscoroutinefunction(self.get_response): + markcoroutinefunction(self) + + def __call__(self, request: Request) -> Any: + if iscoroutinefunction(self): + return self.__acall__(request) + return self.get_response(request) + + async def __acall__(self, request: Request) -> HttpResponseBase: + return await self.get_response(request) # type: ignore[misc] + + def process_view( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return self._process_view_match(request, view_func, view_args, view_kwargs) + + def _process_view_match( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> Any: + #: we check if we're in an async or sync runtime once, then + # overwrite the method with the actual impl. + try: + asyncio.get_running_loop() + method = self._process_view_inner + except RuntimeError: + method = self._process_view_sync # type: ignore[assignment] + setattr(self, "_process_view_match", method) + return method(request, view_func, view_args, view_kwargs) + + def _process_view_sync( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) + + async def _process_view_inner( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) + if proxy_response is not None: + return proxy_response + return None diff --git a/src/sentry/hybridcloud/apigateway_async/proxy.py b/src/sentry/hybridcloud/apigateway_async/proxy.py new file mode 100644 index 00000000000000..7fed6928dfaed8 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/proxy.py @@ -0,0 +1,219 @@ +""" +Utilities related to proxying a request to a cell +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncGenerator, AsyncIterator +from urllib.parse import urljoin, urlparse + +import httpx +from asgiref.sync import sync_to_async +from django.conf import settings +from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse +from django.http.response import HttpResponseBase + +from sentry import options +from sentry.api.exceptions import RequestTimeout +from sentry.objectstore.endpoints.organization import get_raw_body_async +from sentry.silo.util import ( + PROXY_APIGATEWAY_HEADER, + PROXY_DIRECT_LOCATION_HEADER, + clean_outbound_headers, + clean_proxy_headers, +) +from sentry.types.cell import ( + Cell, + CellResolutionError, + get_cell_by_name, + get_cell_for_organization, +) +from sentry.utils import metrics +from sentry.utils.http import BodyAsyncWrapper + +from .circuitbreaker import ( + CircuitBreakerManager, + CircuitBreakerOverflow, + CircuitBreakerWindowOverflow, +) + +logger = logging.getLogger(__name__) + +proxy_client = httpx.AsyncClient() +circuitbreakers = CircuitBreakerManager() + +# Endpoints that handle uploaded files have higher timeouts configured +# and we need to honor those timeouts when proxying. +# See frontend/templates/sites-enabled/sentry.io in getsentry/ops +ENDPOINT_TIMEOUT_OVERRIDE = { + "sentry-api-0-chunk-upload": 90.0, + "sentry-api-0-organization-release-files": 90.0, + "sentry-api-0-project-release-files": 90.0, + "sentry-api-0-dsym-files": 90.0, + "sentry-api-0-installable-preprod-artifact-download": 90.0, + "sentry-api-0-project-preprod-artifact-download": 90.0, + "sentry-api-0-organization-preprod-artifact-size-analysis-download": 90.0, + "sentry-api-0-organization-objectstore": 90.0, +} + +# stream 0.5 MB at a time +PROXY_CHUNK_SIZE = 512 * 1024 + + +async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]: + """Yield chunks from an httpx response and close the connection when done.""" + try: + async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE): + yield chunk + finally: + await response.aclose() + + +def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: + """Convert an httpx Response into a Django response.""" + + if content_type := response.headers.get("Content-Type", None): + del response.headers["Content-Type"] + new_headers = clean_outbound_headers(response.headers) + + streamed_response = StreamingHttpResponse( + streaming_content=_stream_response_and_close(response), + status=response.status_code, + content_type=content_type, + ) + + for header, value in new_headers.items(): + streamed_response[header] = value + + streamed_response[PROXY_DIRECT_LOCATION_HEADER] = remote_url + return streamed_response + + +async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]: + async for chunk in body: + yield chunk + + +async def proxy_request( + request: HttpRequest, + org_id_or_slug: str, + url_name: str, +) -> HttpResponseBase: + """Take a django request object and proxy it to a remote location given an org_id_or_slug""" + + try: + cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug) + except CellResolutionError as e: + logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)}) + return HttpResponse(status=404) + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_error_embed_request( + request: HttpRequest, dsn: str, url_name: str +) -> HttpResponseBase | None: + try: + parsed = urlparse(dsn) + except Exception as err: + logger.info("apigateway.error_embed.invalid_dsn", extra={"dsn": dsn, "error": err}) + return None + host = parsed.netloc + app_host = urlparse(options.get("system.url-prefix")).netloc + if not host.endswith(app_host): + # Don't further parse URLs that aren't for us. + return None + + app_segments = app_host.split(".") + host_segments = host.split(".") + if len(host_segments) - len(app_segments) < 3: + # If we don't have a o123.ingest.{cell}.{app_host} style domain + # we forward to the monolith cell + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + return await proxy_cell_request(request, cell, url_name) + try: + cell_offset = len(app_segments) + 1 + cell_segment = host_segments[cell_offset * -1] + cell = get_cell_by_name(cell_segment) + except Exception: + return None + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_cell_request( + request: HttpRequest, + cell: Cell, + url_name: str, +) -> HttpResponseBase: + """Take a django request object and proxy it to a cell silo""" + metric_tags = {"region": cell.name, "url_name": url_name} + target_url = urljoin(cell.address, request.path) + + content_encoding = request.headers.get("Content-Encoding") + content_length = request.headers.get("Content-Length") + header_dict = clean_proxy_headers(request.headers) + header_dict[PROXY_APIGATEWAY_HEADER] = "true" + + assert request.method is not None + query_params = request.GET + + timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) + + # XXX: See sentry.testutils.pytest.sentry for more information + if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): + return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) + + try: + async with circuitbreakers.get(cell.name) as circuitbreaker: + if url_name == "sentry-api-0-organization-objectstore": + if content_encoding: + header_dict["Content-Encoding"] = content_encoding + data = get_raw_body_async(request) + else: + data = BodyAsyncWrapper(request.body) + # With request streaming, and without `Content-Length` header, + # `httpx` will set chunked transfer encoding. + # Upstream doesn't necessarily support this, + # thus we re-add the header if it was present in the original request. + if content_length: + header_dict["Content-Length"] = content_length + + try: + with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): + req = proxy_client.build_request( + request.method, + target_url, + headers=header_dict, + params=dict(query_params) if query_params is not None else None, + content=_stream_request(data) if data else None, # type: ignore[arg-type] + timeout=timeout, + ) + resp = await proxy_client.send(req, stream=True, follow_redirects=False) + if resp.status_code >= 502: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + return _adapt_response(resp, target_url) + except (httpx.TimeoutException, asyncio.CancelledError): + metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) + circuitbreaker.incr_failures() + # remote silo timeout. Use DRF timeout instead + raise RequestTimeout() + except httpx.RequestError: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + raise + except CircuitBreakerOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.overflow", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Too many requests"}, + status=429, + ) + except CircuitBreakerWindowOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Downstream service temporarily unavailable"}, + status=503, + ) diff --git a/src/sentry/integrations/utils/source_context.py b/src/sentry/integrations/utils/source_context.py new file mode 100644 index 00000000000000..cf07a961bdd10b --- /dev/null +++ b/src/sentry/integrations/utils/source_context.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import logging +from collections.abc import Sequence +from typing import TYPE_CHECKING, TypedDict + +from sentry.constants import ObjectStatus +from sentry.integrations.models.repository_project_path_config import RepositoryProjectPathConfig +from sentry.integrations.services.integration import integration_service +from sentry.integrations.source_code_management.repository import RepositoryIntegration +from sentry.issues.auto_source_code_config.code_mapping import ( + convert_stacktrace_frame_path_to_source_path, +) +from sentry.lang.javascript.utils import LINES_OF_CONTEXT, get_source_context +from sentry.shared_integrations.exceptions import ( + ApiError, + ApiRateLimitedError, + IntegrationConfigurationError, +) +from sentry.utils.event_frames import EventFrame + +if TYPE_CHECKING: + from sentry.integrations.base import IntegrationInstallation + from sentry.integrations.services.integration.model import RpcIntegration + from sentry.issues.endpoints.project_stacktrace_link import StacktraceLinkContext + from sentry.models.repository import Repository + +logger = logging.getLogger(__name__) + + +class SourceContextResult(TypedDict): + context: list[list[int | str]] # [[lineNo, content], ...] + error: str | None + source_url: str | None + + +def _format_context( + pre_context: list[bytes] | None, + context_line: bytes | None, + post_context: list[bytes] | None, + lineno: int, +) -> list[list[int | str]]: + """Format source context into [[lineNo, content], ...] tuples for the frontend.""" + result: list[list[int | str]] = [] + + start_line = lineno - len(pre_context or []) + + if pre_context: + for i, line in enumerate(pre_context): + result.append([start_line + i, line.decode("utf-8", errors="replace")]) + + if context_line is not None: + result.append([lineno, context_line.decode("utf-8", errors="replace")]) + + if post_context: + for i, line in enumerate(post_context): + result.append([lineno + 1 + i, line.decode("utf-8", errors="replace")]) + + return result + + +def _resolve_integration( + config: RepositoryProjectPathConfig, +) -> tuple[RpcIntegration, RepositoryIntegration] | None: + """Resolve the integration and installation for a code mapping config.""" + integration = integration_service.get_integration( + organization_integration_id=config.organization_integration_id, + status=ObjectStatus.ACTIVE, + ) + if not integration: + return None + + install = integration.get_installation(organization_id=config.project.organization_id) + if not isinstance(install, RepositoryIntegration): + return None + + return integration, install + + +def _fetch_file_from_scm( + install: IntegrationInstallation, + integration_id: int, + repository: Repository, + src_path: str, + ref: str, +) -> tuple[str | None, str | None]: + """ + Fetch file content from SCM. + + Returns (file_content, error). If error is "rate_limited", the caller + should stop iterating entirely. + """ + try: + client = install.get_client() + except Exception: + logger.warning( + "scm_source_context.get_client_error", + extra={ + "integration_id": integration_id, + "src_path": src_path, + }, + exc_info=True, + ) + return None, "integration_error" + + try: + file_content = client.get_file(repository, src_path, ref) + except NotImplementedError: + return None, "get_file_not_supported" + except ApiRateLimitedError: + return None, "rate_limited" + except ApiError as e: + if e.code == 404: + return None, "file_not_found" + elif e.code == 403: + return None, "integration_forbidden" + else: + logger.warning( + "scm_source_context.fetch_error", + extra={ + "error": str(e), + "integration_id": integration_id, + "src_path": src_path, + }, + ) + return None, "integration_error" + + return file_content, None + + +def _extract_source_lines( + file_content: str, + lineno: int, + context_lines: int, +) -> tuple[list[list[int | str]], str | None]: + """ + Extract context lines from file content around the given line number. + + Returns (context, error). + """ + lines = [line.encode("utf-8") for line in file_content.splitlines()] + + if lineno < 1 or lineno > len(lines): + return [], "line_out_of_range" + + pre_context, context_line, post_context = get_source_context(lines, lineno, context_lines) + + context = _format_context(pre_context, context_line, post_context, lineno) + return context, None + + +def fetch_source_context_from_scm( + configs: Sequence[RepositoryProjectPathConfig], + ctx: StacktraceLinkContext, + context_lines: int = LINES_OF_CONTEXT, +) -> SourceContextResult: + """ + Fetch source context lines from an SCM integration for a stack trace frame. + + Iterates code mappings to resolve the frame file path to a repository path, + then fetches the file content via the integration client and extracts the + surrounding lines. + """ + result: SourceContextResult = { + "context": [], + "error": None, + "source_url": None, + } + + line_no_str = ctx.get("line_no") + if not line_no_str: + result["error"] = "missing_line_number" + return result + + try: + lineno = int(line_no_str) + except (TypeError, ValueError): + result["error"] = "invalid_line_number" + return result + + frame = EventFrame.from_dict(ctx) + platform = ctx["platform"] + sdk_name = ctx.get("sdk_name") + + # Resolve integration and install once per unique org_integration_id + resolved_integrations: dict[int, tuple[RpcIntegration, RepositoryIntegration] | None] = {} + + for config in configs: + src_path = convert_stacktrace_frame_path_to_source_path( + frame=frame, + platform=platform, + sdk_name=sdk_name, + code_mapping=config, + ) + if not src_path: + continue + + org_integration_id = config.organization_integration_id + if org_integration_id not in resolved_integrations: + resolved_integrations[org_integration_id] = _resolve_integration(config) + + resolved = resolved_integrations[org_integration_id] + if resolved is None: + continue + + integration, install = resolved + + ref = ctx.get("commit_id") or str(config.default_branch or "") + + file_content, fetch_error = _fetch_file_from_scm( + install, integration.id, config.repository, src_path, ref + ) + + if fetch_error: + result["error"] = fetch_error + if fetch_error == "rate_limited": + return result + continue + + if file_content is None: + continue + + context, extract_error = _extract_source_lines(file_content, lineno, context_lines) + if extract_error: + result["error"] = extract_error + continue + + result["context"] = context + result["error"] = None + + try: + source_url = install.get_stacktrace_link( + config.repository, src_path, str(config.default_branch or ""), ctx.get("commit_id") + ) + result["source_url"] = source_url + except (ApiError, IntegrationConfigurationError): + pass + + return result + + if not result["error"]: + result["error"] = "no_code_mapping_match" + return result diff --git a/src/sentry/issues/endpoints/__init__.py b/src/sentry/issues/endpoints/__init__.py index a2ebb4670c4eaf..2163292b0215db 100644 --- a/src/sentry/issues/endpoints/__init__.py +++ b/src/sentry/issues/endpoints/__init__.py @@ -30,6 +30,7 @@ from .project_group_index import ProjectGroupIndexEndpoint from .project_group_stats import ProjectGroupStatsEndpoint from .project_stacktrace_link import ProjectStacktraceLinkEndpoint +from .project_stacktrace_source_context import ProjectStacktraceSourceContextEndpoint from .related_issues import RelatedIssuesEndpoint from .shared_group_details import SharedGroupDetailsEndpoint from .source_map_debug import SourceMapDebugEndpoint @@ -66,6 +67,7 @@ "ProjectGroupIndexEndpoint", "ProjectGroupStatsEndpoint", "ProjectStacktraceLinkEndpoint", + "ProjectStacktraceSourceContextEndpoint", "RelatedIssuesEndpoint", "SharedGroupDetailsEndpoint", "ShortIdLookupEndpoint", diff --git a/src/sentry/issues/endpoints/project_stacktrace_source_context.py b/src/sentry/issues/endpoints/project_stacktrace_source_context.py new file mode 100644 index 00000000000000..69c2ac577d7572 --- /dev/null +++ b/src/sentry/issues/endpoints/project_stacktrace_source_context.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging + +from rest_framework.request import Request +from rest_framework.response import Response + +from sentry import features +from sentry.api.api_owners import ApiOwner +from sentry.api.api_publish_status import ApiPublishStatus +from sentry.api.base import cell_silo_endpoint +from sentry.api.bases.project import ProjectEndpoint +from sentry.integrations.utils.source_context import fetch_source_context_from_scm +from sentry.issues.auto_source_code_config.code_mapping import get_sorted_code_mapping_configs +from sentry.issues.endpoints.project_stacktrace_link import generate_context +from sentry.models.project import Project + +logger = logging.getLogger(__name__) + + +@cell_silo_endpoint +class ProjectStacktraceSourceContextEndpoint(ProjectEndpoint): + publish_status = { + "GET": ApiPublishStatus.PRIVATE, + } + """ + Returns source context lines for a stack trace frame by fetching the file + from the configured SCM integration (GitHub, GitLab, Perforce, etc.). + + `file`: The file path from the stack trace (required) + `lineNo`: The line number to fetch context around (required) + `commitId` (optional): The commit_id for the last commit of the release + `platform` (optional): The platform of the event + `sdkName` (optional): The sdk.name associated with the event + `absPath` (optional): The abs_path field value of the relevant stack frame + `module` (optional): The module field value of the relevant stack frame + `package` (optional): The package field value of the relevant stack frame + """ + + owner = ApiOwner.ISSUES + + def get(self, request: Request, project: Project) -> Response: + if not features.has( + "organizations:scm-source-context", + project.organization, + actor=request.user, + ): + return Response(status=404) + + if not project.get_option("sentry:scm_source_context_enabled", False): + return Response(status=404) + + ctx = generate_context(request.GET) + + if ctx["file"] == "": + return Response({"detail": "Filepath is required"}, status=400) + + if not ctx["line_no"]: + return Response({"detail": "lineNo is required"}, status=400) + + configs = get_sorted_code_mapping_configs(project) + if not configs: + return Response( + { + "context": [], + "error": "no_code_mappings_for_project", + "sourceUrl": None, + } + ) + + result = fetch_source_context_from_scm(configs, ctx) + + return Response( + { + "context": result["context"], + "error": result["error"], + "sourceUrl": result["source_url"], + } + ) diff --git a/src/sentry/models/options/project_option.py b/src/sentry/models/options/project_option.py index 8ff587fb2e426e..c5f3690a65b79e 100644 --- a/src/sentry/models/options/project_option.py +++ b/src/sentry/models/options/project_option.py @@ -78,6 +78,7 @@ "sentry:preprod_snapshot_status_checks_fail_on_added", "sentry:preprod_snapshot_status_checks_fail_on_removed", "sentry:preprod_distribution_pr_comments_enabled_by_customer", + "sentry:scm_source_context_enabled", "quotas:spike-protection-disabled", "feedback:branding", "digests:mail:minimum_delay", diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index c6ed04945011c7..5bb22190d1a37f 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -6,12 +6,13 @@ from wsgiref.util import is_hop_by_hop import requests +from asgiref.sync import sync_to_async from django.http import HttpRequest, StreamingHttpResponse from requests import Response as ExternalResponse from rest_framework.request import Request from rest_framework.response import Response -from sentry.utils.http import BodyWithLength +from sentry.utils.http import BodyAsyncWrapper, BodyWithLength # TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian uwsgi: Any = None @@ -150,6 +151,32 @@ def stream_generator(): return BodyWithLength(request) +def get_raw_body_async( + request: HttpRequest, +) -> BodyAsyncWrapper | ChunkedEncodingAsyncDecoder | BodyWithLength: + if request.body: + return BodyAsyncWrapper(request.body) + + wsgi_input = request.META.get("wsgi.input") + if "granian" in request.META.get("SERVER_SOFTWARE", "").lower(): + return BodyAsyncWrapper(wsgi_input) + + # wsgiref will raise an exception and hang when attempting to read wsgi.input while there's no body. + # For now, support bodies only on PUT and POST requests when not using Granian. + if request.method not in ("PUT", "POST"): + return BodyAsyncWrapper(b"") + + # wsgiref (dev/test server) + if ( + hasattr(wsgi_input, "_read") + and request.headers.get("Transfer-Encoding", "").lower() == "chunked" + ): + return ChunkedEncodingAsyncDecoder(wsgi_input._read) # type: ignore[union-attr] + + # wsgiref and the request has been already proxied through control silo + return BodyWithLength(request) + + def get_target_url(path: str) -> str: base = options.get("objectstore.config")["base_url"].rstrip("/") # `path` should be a relative path, only grab that part @@ -248,3 +275,13 @@ def read(self, size: int = -1) -> bytes: raise ValueError("Malformed chunk encoded stream") return b"".join(buffer) + + +class ChunkedEncodingAsyncDecoder(ChunkedEncodingDecoder): + def __aiter__(self): + return self + + async def __anext__(self) -> bytes: + if self._done: + raise StopAsyncIteration + return await sync_to_async(self.read)() diff --git a/src/sentry/projectoptions/defaults.py b/src/sentry/projectoptions/defaults.py index 4534fbdedbbe73..6a29b4add7345d 100644 --- a/src/sentry/projectoptions/defaults.py +++ b/src/sentry/projectoptions/defaults.py @@ -210,3 +210,6 @@ # Boolean to enable/disable build distribution PR comments for this project. register(key="sentry:preprod_distribution_pr_comments_enabled_by_customer", default=True) + +# Whether to enable on-demand source context fetching from SCM integrations +register(key="sentry:scm_source_context_enabled", default=False) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index ba9795f7c67f47..6a1e8a526b8588 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -471,9 +471,11 @@ def devserver( "SENTRY_SILO_DEVSERVER": "1", "SENTRY_SILO_MODE": "CONTROL", "SENTRY_REGION": "", + "SENTRY_APIGW_ASYNC": "true", "SENTRY_CONTROL_SILO_PORT": server_port, "SENTRY_REGION_SILO_PORT": str(ports["region.server"]), "SENTRY_DEVSERVER_BIND": f"127.0.0.1:{server_port}", + "SENTRY_GRANIAN_IFACE": "asginl", "SENTRY_GRANIAN_PORT": str(ports["server"]), "SENTRY_GRANIAN_WORKERS": "2", } diff --git a/src/sentry/services/http.py b/src/sentry/services/http.py index 9678945b00c4a8..f515a25a28a1b2 100644 --- a/src/sentry/services/http.py +++ b/src/sentry/services/http.py @@ -5,7 +5,6 @@ from typing import Any from granian import Granian -from granian.constants import Interfaces as GranianInterfaces from sentry.services.base import Service @@ -15,7 +14,7 @@ def _run_server(options: dict[str, Any]): target=options["module"], address=options["host"], port=options["port"], - interface=GranianInterfaces.WSGI, + interface=options["iface"], workers=options["workers"], backlog=options["backlog"], workers_kill_timeout=options["workers-kill-timeout"], @@ -51,6 +50,7 @@ def __init__( host = host or settings.SENTRY_WEB_HOST port = port or int(os.environ.get("SENTRY_GRANIAN_PORT", "0")) or settings.SENTRY_WEB_PORT workers = workers or int(os.environ.get("SENTRY_GRANIAN_WORKERS", "1")) + iface = os.environ.get("SENTRY_GRANIAN_IFACE", "wsgi") reload = bool(os.environ.get("SENTRY_GRANIAN_RELOAD")) options = (settings.SENTRY_WEB_OPTIONS or {}).copy() @@ -58,7 +58,8 @@ def __init__( for k, v in extra_options.items(): options[k] = v - options.setdefault("module", "sentry.wsgi:application") + options.setdefault("module", f"sentry.{iface[:4]}:application") + options.setdefault("iface", iface) options.setdefault("host", host) options.setdefault("port", port) options.setdefault("workers", workers) diff --git a/src/sentry/silo/base.py b/src/sentry/silo/base.py index 91dade5a9e59d0..cccbf81eadcb2a 100644 --- a/src/sentry/silo/base.py +++ b/src/sentry/silo/base.py @@ -3,7 +3,6 @@ import abc import contextlib import functools -import threading import typing from collections.abc import Callable, Generator, Iterable from enum import Enum @@ -56,7 +55,7 @@ def get_current_mode(cls) -> SiloMode: return SingleProcessSiloModeState.get_mode() or process_level_silo_mode -class SingleProcessSiloModeState(threading.local): +class SingleProcessSiloModeState: """ Used by silo endpoint decorators and other contexts that help 'suggest' to acceptance testing and local single process silo testing which 'silo context' the diff --git a/src/sentry/testutils/asserts.py b/src/sentry/testutils/asserts.py index e8c07edd4832ee..5b08264859f6f6 100644 --- a/src/sentry/testutils/asserts.py +++ b/src/sentry/testutils/asserts.py @@ -2,7 +2,6 @@ from django.db import models from django.db.models.functions import Cast -from django.http import StreamingHttpResponse from sentry.constants import ObjectStatus from sentry.integrations.types import EventLifecycleOutcome @@ -44,10 +43,7 @@ def assert_commit_shape(commit): def assert_status_code(response, minimum: int, maximum: int | None = None): # Omit max to assert status_code == minimum. maximum = maximum or minimum + 1 - assert minimum <= response.status_code < maximum, ( - response.status_code, - response.getvalue() if isinstance(response, StreamingHttpResponse) else response.content, - ) + assert minimum <= response.status_code < maximum, response def assert_existing_projects_status( diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index a1202f16f7f456..486513a7ca3e2d 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -17,6 +17,7 @@ from uuid import UUID, uuid4 from zlib import compress +import httpx import pytest import requests import responses @@ -36,7 +37,6 @@ from django.utils import timezone from django.utils.functional import cached_property from google.protobuf.timestamp_pb2 import Timestamp -from requests.utils import CaseInsensitiveDict, get_encoding_from_headers from rest_framework import status from rest_framework.request import Request from rest_framework.response import Response @@ -678,36 +678,45 @@ def get_cursor_headers(self, response): def api_gateway_proxy_stubbed(self): """Mocks a fake api gateway proxy that redirects via Client objects""" - def proxy_raw_request( - method: str, - url: str, - headers: Mapping[str, str], - params: Mapping[str, str] | None, - data: Any, - **kwds: Any, - ) -> requests.Response: - from django.test.client import Client - - client = Client() - extra: Mapping[str, Any] = { - f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() - } - if params: - url += "?" + urlencode(params) - with assume_test_silo_mode(SiloMode.CELL): - resp = getattr(client, method.lower())( - url, b"".join(data), headers["Content-Type"], **extra - ) - response = requests.Response() - response.status_code = resp.status_code - response.headers = CaseInsensitiveDict(resp.headers) - response.encoding = get_encoding_from_headers(response.headers) - response.raw = BytesIO(resp.content) - return response + from asgiref.sync import sync_to_async + from django.test.client import Client + + class MockedProxy: + def __init__(self): + self.client = Client() + + @staticmethod + async def _consume_body(content): + ret = b"" + async for chunk in content: + ret += chunk + return ret + + def build_request(self, method, url, headers, params, content, timeout): + assert not params + target = getattr(self.client, method.lower()) + content_type = headers.pop("Content-Type", "application/octet-stream") + extra: Mapping[str, Any] = { + f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() + } + return target, (url, content, content_type), extra + + async def send(self, req, stream, follow_redirects): + with assume_test_silo_mode(SiloMode.CELL): + url, content, content_type = req[1] + content = await self._consume_body(content) + resp = await sync_to_async(req[0])(url, content, content_type, **req[2]) + wresp = httpx.Response( + status_code=resp.status_code, + headers=dict(resp.headers), + content=resp.content, + ) + return wresp + mock_client = MockedProxy() with mock.patch( - "sentry.hybridcloud.apigateway.proxy.external_request", - new=proxy_raw_request, + "sentry.hybridcloud.apigateway_async.proxy.proxy_client", + new=mock_client, ): yield diff --git a/src/sentry/testutils/helpers/apigateway.py b/src/sentry/testutils/helpers/apigateway.py index 08e3c4ac4bf6f1..bc3cde3f74f5cf 100644 --- a/src/sentry/testutils/helpers/apigateway.py +++ b/src/sentry/testutils/helpers/apigateway.py @@ -1,8 +1,12 @@ from __future__ import annotations +from collections.abc import Callable +from contextlib import contextmanager +from typing import Any +from unittest.mock import patch from urllib.parse import parse_qs -import responses +import httpx from django.conf import settings from django.http import HttpResponseRedirect from django.test import override_settings @@ -75,14 +79,98 @@ def get(self, request: Request) -> Response: ] + api_urls.urlpatterns +# Type for httpx mock callback: receives httpx.Request, returns (status, headers, body) +HttpxCallback = Callable[[httpx.Request], tuple[int, dict[str, str], str | bytes]] + + +class HttpxMockRouter: + """Mock HTTP router for httpx, replacing the `responses` library for async proxy tests.""" + + def __init__(self) -> None: + self._routes: list[dict[str, Any]] = [] + + def add( + self, + method: str, + url: str, + body: str | bytes = b"", + status_code: int = 200, + headers: dict[str, str] | None = None, + json_data: Any | None = None, + content_type: str | None = None, + ) -> None: + if json_data is not None: + body = json.dumps(json_data).encode() + content_type = content_type or "application/json" + elif isinstance(body, str): + body = body.encode() + resp_headers = dict(headers or {}) + if content_type: + resp_headers["Content-Type"] = content_type + self._routes.append( + { + "method": method.upper(), + "url": url, + "body": body, + "status_code": status_code, + "headers": resp_headers, + } + ) + + def add_callback(self, method: str, url: str, callback: HttpxCallback) -> None: + self._routes.append( + { + "method": method.upper(), + "url": url, + "callback": callback, + } + ) + + def handler(self, request: httpx.Request) -> httpx.Response: + url_str = str(request.url) + # Strip query params for matching + url_path = url_str.split("?")[0] + for route in self._routes: + if request.method != route["method"]: + continue + route_url = route["url"].split("?")[0] + if url_path != route_url: + continue + + if "callback" in route: + status_code, headers, body = route["callback"](request) + if isinstance(body, str): + body = body.encode() + return httpx.Response( + status_code, headers=dict(headers), content=body, request=request + ) + else: + return httpx.Response( + route["status_code"], + headers=route["headers"], + content=route["body"], + request=request, + ) + + raise ValueError(f"No mock route matched: {request.method} {url_str}") + + +@contextmanager +def mock_proxy_client(router: HttpxMockRouter): + """Patch the proxy_client with a mock httpx.AsyncClient using the given router.""" + mock_client = httpx.AsyncClient(transport=httpx.MockTransport(router.handler)) + with patch("sentry.hybridcloud.apigateway_async.proxy.proxy_client", mock_client): + yield mock_client + + def verify_request_body(body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): if request.headers.get("content-type") == "application/json": - assert json.load(request.body) == body + assert json.loads(request.content) == body else: - assert request.body.read() == body + assert request.content == (body if isinstance(body, bytes) else body.encode()) assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -90,9 +178,9 @@ def request_callback(request): def verify_request_headers(headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -100,10 +188,10 @@ def request_callback(request): def verify_request_params(params, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - request_params = parse_qs(request.url.split("?")[1]) + def request_callback(request: httpx.Request): + request_params = parse_qs(str(request.url).split("?")[1]) assert (request.headers[key] == headers[key] for key in headers) for key in params: assert key in request_params @@ -117,10 +205,10 @@ def request_callback(request): def verify_file_body(file_body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - assert file_body in request.body.read() + def request_callback(request: httpx.Request): + assert file_body in request.content assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -129,8 +217,10 @@ def request_callback(request): def provision_middleware(): middleware = list(settings.MIDDLEWARE) - if "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" not in middleware: - middleware = ["sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware"] + middleware + if "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" not in middleware: + middleware = [ + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + ] + middleware return middleware @@ -148,34 +238,42 @@ class ApiGatewayTestCase(APITestCase): def setUp(self): super().setUp() - responses.add( - responses.GET, + self.httpx_router = HttpxMockRouter() + self.httpx_router.add( + "GET", f"{self.CELL.address}/get", body=json.dumps({"proxy": True}), content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/error", body=json.dumps({"proxy": True}), - status=400, + status_code=400, content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) self.organization = self.create_organization(region=self.CELL) # Echos the request body and header back for verification - def return_request_body(request): - return (200, request.headers, request.body) + def return_request_body(request: httpx.Request): + return (200, dict(request.headers), request.content) # Echos the query params and header back for verification - def return_request_params(request): - params = parse_qs(request.url.split("?")[1]) - return (200, request.headers, json.dumps(params).encode()) + def return_request_params(request: httpx.Request): + params = parse_qs(str(request.url).split("?")[1]) + return (200, dict(request.headers), json.dumps(params).encode()) - responses.add_callback(responses.GET, f"{self.CELL.address}/echo", return_request_params) - responses.add_callback(responses.POST, f"{self.CELL.address}/echo", return_request_body) + self.httpx_router.add_callback("GET", f"{self.CELL.address}/echo", return_request_params) + self.httpx_router.add_callback("POST", f"{self.CELL.address}/echo", return_request_body) self.middleware = provision_middleware() + # Enter the mock proxy client context for the duration of the test + self._mock_proxy_ctx = mock_proxy_client(self.httpx_router) + self._mock_proxy_ctx.__enter__() + + def tearDown(self): + self._mock_proxy_ctx.__exit__(None, None, None) + super().tearDown() diff --git a/src/sentry/testutils/helpers/response.py b/src/sentry/testutils/helpers/response.py index d6681e93c6f884..87d359c443bc2b 100644 --- a/src/sentry/testutils/helpers/response.py +++ b/src/sentry/testutils/helpers/response.py @@ -2,6 +2,7 @@ from typing import TypeGuard +from asgiref.sync import async_to_sync from django.http.response import HttpResponseBase, StreamingHttpResponse from rest_framework.response import Response @@ -16,11 +17,23 @@ def is_streaming_response(response: HttpResponseBase) -> TypeGuard[StreamingHttp return isinstance(response, StreamingHttpResponse) +async def _async_streaming_response_content(response: StreamingHttpResponse) -> bytes: + data = [] + async for chunk in response: + data.append(chunk) + iterator = response._iterator # type: ignore[attr-defined] + if hasattr(iterator, "aclose"): + await iterator.aclose() + return b"".join(data) + + def close_streaming_response(response: HttpResponseBase) -> bytes: """Exhausts the streamed file in a response. - When the file is exahusted, this underlying file descriptor is closed + When the file is exhausted, the underlying file descriptor is closed avoiding a `ResourceWarning`. """ assert isinstance(response, StreamingHttpResponse) + if response.is_async: + return async_to_sync(_async_streaming_response_content)(response) return response.getvalue() diff --git a/src/sentry/testutils/silo.py b/src/sentry/testutils/silo.py index 98a1be33456bf3..74a79df4be2bd2 100644 --- a/src/sentry/testutils/silo.py +++ b/src/sentry/testutils/silo.py @@ -1,12 +1,12 @@ from __future__ import annotations import contextlib +import contextvars import functools import inspect import os import re import sys -import threading import typing from collections.abc import Callable, Collection, Generator, Iterable, Mapping, MutableSet, Sequence from contextlib import contextmanager, nullcontext @@ -35,46 +35,43 @@ def monkey_patch_single_process_silo_mode_state(): - class LocalSiloModeState(threading.local): - mode: SiloMode | None = None - cell: Cell | None = None - - state = LocalSiloModeState() + _silo_mode_var: contextvars.ContextVar[SiloMode | None] = contextvars.ContextVar( + "silo_mode", default=None + ) + _silo_cell_var: contextvars.ContextVar[Cell | None] = contextvars.ContextVar( + "silo_cell", default=None + ) @contextlib.contextmanager def enter(mode: SiloMode, cell: Cell | None = None) -> Generator[None]: - assert state.mode is None, ( + assert _silo_mode_var.get() is None, ( "Re-entrant invariant broken! Use exit_single_process_silo_context " "to explicit pass 'fake' RPC boundaries." ) - old_mode = state.mode - old_cell = state.cell - state.mode = mode - state.cell = cell + mode_token = _silo_mode_var.set(mode) + cell_token = _silo_cell_var.set(cell) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) @contextlib.contextmanager def exit() -> Generator[None]: - old_mode = state.mode - old_cell = state.cell - state.mode = None - state.cell = None + mode_token = _silo_mode_var.set(None) + cell_token = _silo_cell_var.set(None) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) def get_mode() -> SiloMode | None: - return state.mode + return _silo_mode_var.get() def get_cell() -> Cell | None: - return state.cell + return _silo_cell_var.get() SingleProcessSiloModeState.enter = staticmethod(enter) # type: ignore[method-assign] SingleProcessSiloModeState.exit = staticmethod(exit) # type: ignore[method-assign] diff --git a/src/sentry/utils/http.py b/src/sentry/utils/http.py index 8cf3f48dca9159..53d15211b2d82f 100644 --- a/src/sentry/utils/http.py +++ b/src/sentry/utils/http.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections.abc import Collection, Iterator -from typing import TYPE_CHECKING, NamedTuple, TypeGuard, overload +from typing import TYPE_CHECKING, Any, NamedTuple, TypeGuard, overload from urllib.parse import quote, urljoin, urlparse +from asgiref.sync import sync_to_async from django.conf import settings from django.http import HttpRequest @@ -226,6 +227,32 @@ def is_using_customer_domain(request: HttpRequest) -> TypeGuard[_HttpRequestWith return bool(hasattr(request, "subdomain") and request.subdomain) +class BodyAsyncWrapper: + def __init__(self, body: Any): + self._bool = bool(body) + self.body = [body] if isinstance(body, bytes) else body + + def __bool__(self) -> bool: + return self._bool + + def __aiter__(self): + return BodyAsyncIter(self) + + +class BodyAsyncIter: + def __init__(self, parent: BodyAsyncWrapper): + self.biter = iter(parent.body) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() + + class BodyWithLength: """Wraps an HttpRequest with a __len__ so that the requests library does not assume length=0 in all cases""" @@ -235,8 +262,25 @@ def __init__(self, request: HttpRequest): def __iter__(self) -> Iterator[bytes]: return iter(self.request) + def __aiter__(self) -> BodyWithLengthAiter: + return BodyWithLengthAiter(self) + def __len__(self) -> int: return int(self.request.headers.get("Content-Length", "0")) def read(self, size: int | None = None) -> bytes: return self.request.read(size) + + +class BodyWithLengthAiter: + def __init__(self, parent: BodyWithLength): + self.biter = iter(parent.request) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() diff --git a/src/sentry/utils/sdk.py b/src/sentry/utils/sdk.py index a71e5ba4c11fbf..04d7916bdcb0c9 100644 --- a/src/sentry/utils/sdk.py +++ b/src/sentry/utils/sdk.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import copy import logging import sys @@ -268,6 +269,15 @@ def before_send_log(log: Log, _: Hint) -> Log | None: if attributes.get("sentry.message.template") == "New partitions assigned: %r": return None + try: + # FIXME: when in ASGI, the call to `options.store` from `in_random_rollout` + # would fail, because of SyncOnlyOperation. + # While we should ideally figure out how to actually fix this, + # for the moment let's just simplify and skip this entirely. + asyncio.get_running_loop() + return None + except Exception: + pass if in_random_rollout("ourlogs.sentry-emit-rollout"): return log return None diff --git a/static/AGENTS.md b/static/AGENTS.md index 37374b51c11cf4..f5a441d194e866 100644 --- a/static/AGENTS.md +++ b/static/AGENTS.md @@ -35,18 +35,31 @@ ### Frontend API Calls +Prefer `apiOptions` with `useQuery` from TanStack Query for type-safe, consistent API calls: + ```typescript -import {useApiQuery, type UseApiQueryResult} from 'sentry/utils/queryClient'; +import {skipToken, useQuery} from '@tanstack/react-query'; +import {apiOptions} from 'sentry/utils/api/apiOptions'; + +// Basic usage +const query = useQuery( + apiOptions.as()('/organizations/$organizationIdOrSlug/endpoint/', { + path: {organizationIdOrSlug: organization.slug}, + staleTime: 30_000, + }) +); -const appSizeQuery: UseApiQueryResult = useApiQuery( - [`/projects/${organization.slug}/pull-requests/size-analysis/${selectedBuildId}/`], - { - staleTime: , // Optional, amount of time before data is considered stale (in ms) - enabled: , // Optional, whether the query is enabled - } +// Conditional fetching — pass skipToken as path to disable the query +const query = useQuery( + apiOptions.as()('/organizations/$organizationIdOrSlug/items/$itemId/', { + path: itemId ? {organizationIdOrSlug: organization.slug, itemId} : skipToken, + staleTime: 30_000, + }) ); ``` +Existing code might use `useApiQuery` from `sentry/utils/queryClient` — prefer `apiOptions` for new code. + ## General Frontend Rules 1. NO new Reflux stores diff --git a/static/app/utils/api/knownSentryApiUrls.generated.ts b/static/app/utils/api/knownSentryApiUrls.generated.ts index 79a6c8e82e684b..9bd1a50fefc88a 100644 --- a/static/app/utils/api/knownSentryApiUrls.generated.ts +++ b/static/app/utils/api/knownSentryApiUrls.generated.ts @@ -721,6 +721,7 @@ export type KnownSentryApiUrls = | '/projects/$organizationIdOrSlug/$projectIdOrSlug/seer/preferences/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-coverage/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-link/' + | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stacktrace-source-context/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/statistical-detector/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/stats/' | '/projects/$organizationIdOrSlug/$projectIdOrSlug/symbol-sources/' diff --git a/static/app/views/insights/pages/conversations/hooks/useConversations.tsx b/static/app/views/insights/pages/conversations/hooks/useConversations.tsx index 84aeb39a95e948..1576103ffa9fa1 100644 --- a/static/app/views/insights/pages/conversations/hooks/useConversations.tsx +++ b/static/app/views/insights/pages/conversations/hooks/useConversations.tsx @@ -72,13 +72,15 @@ export function useConversations() { const pageLinks = getResponseHeader?.('Link'); const data = useMemo(() => { - return (rawData ?? []).map(({firstInput: rawFirstInput, ...rest}): Conversation => { - const firstInput = - typeof rawFirstInput === 'string' - ? rawFirstInput - : (rawFirstInput?.find(content => content.type === 'text')?.text ?? null); - return {...rest, firstInput}; - }); + return (rawData ?? []) + .map(({firstInput: rawFirstInput, ...rest}): Conversation => { + const firstInput = + typeof rawFirstInput === 'string' + ? rawFirstInput + : (rawFirstInput?.find(content => content.type === 'text')?.text ?? null); + return {...rest, firstInput}; + }) + .sort((a, b) => b.endTimestamp - a.endTimestamp); }, [rawData]); return { diff --git a/tests/acceptance/test_proxy.py b/tests/acceptance/test_proxy.py index b49285d62305f5..98c8a39871bd6d 100644 --- a/tests/acceptance/test_proxy.py +++ b/tests/acceptance/test_proxy.py @@ -17,6 +17,7 @@ from sentry.testutils.cases import TransactionTestCase from sentry.testutils.cell import override_cells from sentry.testutils.factories import Factories +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import cell_silo_test from sentry.types.cell import Cell from sentry.utils import json @@ -69,6 +70,6 @@ def test_through_api_gateway(self) -> None: ) assert_status_code(resp, 201) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) team = Team.objects.get(id=result["id"]) assert team.idp_provisioned diff --git a/tests/conftest.py b/tests/conftest.py index 4f47d7f85ccffe..efb241a151aae9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,9 @@ import responses import sentry_sdk +# Set async apigateway as soon as possible +os.environ["SENTRY_APIGW_ASYNC"] = "true" + # Disable crash recovery server in pytest-rerunfailures. Under xdist, Sentry's # global socket.setdefaulttimeout(5) causes the server's per-worker recv threads # to die during Django init (~10s), silently breaking crash recovery anyway. diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway.py b/tests/sentry/hybridcloud/apigateway/test_apigateway.py index 723305c5ab8568..f59eedc88d2fde 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway.py @@ -1,7 +1,6 @@ from urllib.parse import urlencode import pytest -import responses from django.conf import settings from django.test import override_settings from django.urls import get_resolver, reverse @@ -16,12 +15,11 @@ @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ApiGatewayTest(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: query_params = dict(foo="test", bar=["one", "two"]) headers = dict(example="this") - responses.add_callback( - responses.GET, + self.httpx_router.add_callback( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", verify_request_params(query_params, headers), ) @@ -40,13 +38,12 @@ def test_simple(self) -> None: resp_json = json.loads(close_streaming_response(resp)) assert resp_json["proxy"] is True - @responses.activate def test_proxy_does_not_resolve_redirect(self) -> None: - responses.add( - responses.POST, + self.httpx_router.add( + "POST", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", headers={"Location": "https://zombo.com"}, - status=302, + status_code=302, ) url = reverse("region-endpoint", kwargs={"organization_slug": self.organization.slug}) @@ -61,7 +58,6 @@ def test_proxy_does_not_resolve_redirect(self) -> None: response_payload = close_streaming_response(resp) assert response_payload == b"" - @responses.activate def test_cell_pinned_urls_are_defined(self) -> None: resolver = get_resolver() # Ensure that all urls in REGION_PINNED_URL_NAMES exist in api/urls.py @@ -73,18 +69,17 @@ def test_cell_pinned_urls_are_defined(self) -> None: f"REGION_PINNED_URL_NAMES contains {name}, but no route is registered with that name" ) - @responses.activate def test_proxy_check_org_slug_url(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url = reverse( @@ -109,28 +104,27 @@ def test_proxy_check_org_slug_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url_slug = reverse( @@ -178,13 +172,12 @@ def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_region_pinned_url(self) -> None: project_key = self.create_project_key(self.project) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/js-sdk-loader/{project_key.public_key}.js", - json={"proxy": True}, + json_data={"proxy": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -205,17 +198,16 @@ def test_proxy_check_region_pinned_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_cell_pinned_url_with_params(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/register/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/abc123/", - json={"proxy": True, "details": True}, + json_data={"proxy": True, "details": True}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): @@ -230,18 +222,17 @@ def test_proxy_check_cell_pinned_url_with_params(self) -> None: assert resp_json["proxy"] is True assert resp_json["details"] is True - @responses.activate def test_proxy_check_cell_pinned_issue_urls(self) -> None: issue = self.create_group() - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/", - json={"proxy": True, "id": issue.id}, + json_data={"proxy": True, "id": issue.id}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/events/", - json={"proxy": True, "id": issue.id, "events": True}, + json_data={"proxy": True, "id": issue.id, "events": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -262,12 +253,11 @@ def test_proxy_check_cell_pinned_issue_urls(self) -> None: assert resp_json["proxy"] is True assert resp_json["events"] - @responses.activate def test_proxy_error_embed_dsn(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/api/embed/error-page/", - json={"proxy": True, "name": "error-embed"}, + json_data={"proxy": True, "name": "error-embed"}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): # no dsn diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py index f07005d6d73707..2064d9863d822d 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py @@ -1,7 +1,4 @@ -from io import BytesIO - -import requests -import responses +import httpx from sentry.testutils.helpers.apigateway import ApiGatewayTestCase, verify_request_body from sentry.testutils.silo import no_silo_test @@ -10,14 +7,16 @@ @no_silo_test(cells=[ApiGatewayTestCase.CELL]) class VerifyRequestBodyTest(ApiGatewayTestCase): - @responses.activate def test_verify_request_body(self) -> None: body = {"ab": "cd"} headers = {"header": "nope", "content-type": "application/json"} - responses.add_callback( - responses.POST, "http://ab.cd.e/test", verify_request_body(body, headers) - ) - resp = requests.post( - "http://ab.cd.e/test", data=BytesIO(json.dumps(body).encode("utf8")), headers=headers + callback = verify_request_body(body, headers) + + mock_request = httpx.Request( + "POST", + "http://ab.cd.e/test", + headers=headers, + content=json.dumps(body).encode("utf8"), ) - assert resp.status_code == 200 + status_code, resp_headers, resp_body = callback(mock_request) + assert status_code == 200 diff --git a/tests/sentry/hybridcloud/apigateway/test_proxy.py b/tests/sentry/hybridcloud/apigateway/test_proxy.py index c8a66042dfc74d..68bbabf4645910 100644 --- a/tests/sentry/hybridcloud/apigateway/test_proxy.py +++ b/tests/sentry/hybridcloud/apigateway/test_proxy.py @@ -1,22 +1,16 @@ -from unittest.mock import MagicMock, patch from urllib.parse import urlencode -import pytest -import requests -import responses +import httpx +from asgiref.sync import async_to_sync from django.core.files.uploadedfile import SimpleUploadedFile -from django.http import HttpResponse from django.test.client import RequestFactory -from requests.exceptions import ConnectionError, Timeout -from sentry.api.exceptions import RequestTimeout -from sentry.hybridcloud.apigateway.proxy import proxy_request +from sentry.hybridcloud.apigateway_async.proxy import proxy_request as _proxy_request from sentry.silo.util import ( INVALID_OUTBOUND_HEADERS, PROXY_APIGATEWAY_HEADER, PROXY_DIRECT_LOCATION_HEADER, ) -from sentry.testutils.helpers import override_options from sentry.testutils.helpers.apigateway import ( ApiGatewayTestCase, verify_file_body, @@ -27,12 +21,12 @@ from sentry.testutils.silo import control_silo_test from sentry.utils import json +proxy_request = async_to_sync(_proxy_request) url_name = "sentry-api-0-projets" @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ProxyTestCase(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, self.organization.slug, url_name) @@ -54,7 +48,6 @@ def test_simple(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/error" - @responses.activate def test_query_params(self) -> None: query_param_dict = dict(foo="bar", numlist=["1", "2", "3"]) query_param_str = urlencode(query_param_dict, doseq=True) @@ -68,17 +61,15 @@ def test_query_params(self) -> None: assert query_param_dict["foo"] == resp_json["foo"][0] assert query_param_dict["numlist"] == resp_json["numlist"] - @responses.activate def test_bad_org(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, "doesnotexist", url_name) assert resp.status_code == 404 - @responses.activate def test_post(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -94,11 +85,10 @@ def test_post(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/post" - @responses.activate def test_put(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PUT, + self.httpx_router.add_callback( + "PUT", "http://us.internal.sentry.io/put", verify_request_body(request_body, {"test": "header"}), ) @@ -114,11 +104,10 @@ def test_put(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/put" - @responses.activate def test_patch(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PATCH, + self.httpx_router.add_callback( + "PATCH", "http://us.internal.sentry.io/patch", verify_request_body(request_body, {"test": "header"}), ) @@ -134,11 +123,10 @@ def test_patch(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/patch" - @responses.activate def test_head(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.HEAD, + self.httpx_router.add_callback( + "HEAD", "http://us.internal.sentry.io/head", verify_request_headers({"test": "header"}), ) @@ -154,11 +142,10 @@ def test_head(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/head" - @responses.activate def test_delete(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.DELETE, + self.httpx_router.add_callback( + "DELETE", "http://us.internal.sentry.io/delete", verify_request_body(request_body, {"test": "header"}), ) @@ -174,7 +161,6 @@ def test_delete(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/delete" - @responses.activate def test_file_upload(self) -> None: foo = dict(test="a", file="b", what="c") contents = json.dumps(foo).encode() @@ -183,8 +169,8 @@ def test_file_upload(self) -> None: "foo": "bar", } - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_file_body(contents, {"test": "header"}), ) @@ -197,14 +183,13 @@ def test_file_upload(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_alternate_content_type(self) -> None: # Check form encoded files also work foo = dict(test="a", file="b", what="c") contents = urlencode(foo, doseq=True).encode("utf-8") request_body = contents - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(contents, {"test": "header"}), ) @@ -219,16 +204,15 @@ def test_alternate_content_type(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_apply_apigateway_proxy_header(self) -> None: - def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, str], str]: + def request_callback(request: httpx.Request) -> tuple[int, dict[str, str], str]: assert request.headers.get(PROXY_APIGATEWAY_HEADER), ( "Proxied requests should have a header added" ) return 200, {"proxied": "yes"}, json.dumps({"success": True}) - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", request_callback, ) @@ -243,11 +227,10 @@ def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, assert resp.status_code == 200 assert resp["proxied"] == "yes" - @responses.activate def test_strip_request_headers(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -267,176 +250,3 @@ def test_strip_request_headers(self) -> None: resp = proxy_request(request, self.organization.slug, url_name) assert not any([header in resp for header in INVALID_OUTBOUND_HEADERS]) - - -CB_ENABLED = { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": True, -} - - -@control_silo_test(cells=[ApiGatewayTestCase.CELL]) -class ProxyCircuitBreakerTestCase(ApiGatewayTestCase): - def _make_breaker_mock(self, *, allow_request: bool) -> MagicMock: - mock_breaker = MagicMock() - mock_breaker.should_allow_request.return_value = allow_request - return mock_breaker - - @responses.activate - @override_options(CB_ENABLED) - def test_open_circuit_returns_503(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert isinstance(resp, HttpResponse) - assert resp.status_code == 503 - assert json.loads(resp.content) == { - "error": "apigateway", - "detail": "Downstream service temporarily unavailable", - } - - @responses.activate - @override_options(CB_ENABLED) - def test_circuit_breaker_keyed_per_cell(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - key_used = mock_breaker_class.call_args.kwargs["key"] - assert key_used == f"apigateway.proxy.{self.CELL.name}" - - @responses.activate - def test_circuit_breaker_disabled_by_default(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - mock_breaker_class.assert_not_called() - - @responses.activate - @override_options( - { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": False, - } - ) - def test_open_circuit_not_enforced(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - - @responses.activate - @override_options({"apigateway.proxy.circuit-breaker.config": "invalid-lol", **CB_ENABLED}) - def test_handles_invalid_config(self) -> None: - request = RequestFactory().get("http://sentry.io/get") - res = proxy_request(request, self.organization.slug, url_name) - assert res.status_code == 200 - - @responses.activate - @override_options(CB_ENABLED) - def test_timeout_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/timeout", - body=Timeout(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/timeout") - with pytest.raises(RequestTimeout): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_metric(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.metrics") as mock_metrics: - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_metrics.incr.assert_any_call( - "apigateway.proxy.connection_error", - tags={"region": self.CELL.name, "url_name": url_name}, - ) - - @responses.activate - @override_options(CB_ENABLED) - def test_504_response_does_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=504, - body=json.dumps({"detail": "gateway timeout"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 504 - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_500_response_does_not_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=500, - body=json.dumps({"detail": "internal server error"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 500 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_4xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 400 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_2xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - mock_breaker.record_error.assert_not_called() diff --git a/tests/sentry/integrations/utils/test_source_context.py b/tests/sentry/integrations/utils/test_source_context.py new file mode 100644 index 00000000000000..b16c671b746626 --- /dev/null +++ b/tests/sentry/integrations/utils/test_source_context.py @@ -0,0 +1,224 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from sentry.integrations.utils.source_context import ( + _format_context, + fetch_source_context_from_scm, +) +from sentry.issues.endpoints.project_stacktrace_link import StacktraceLinkContext +from sentry.shared_integrations.exceptions import ApiError, ApiRateLimitedError +from sentry.silo.base import SiloMode +from sentry.testutils.cases import TestCase +from sentry.testutils.silo import assume_test_silo_mode + + +class FormatContextTest(TestCase): + def test_format_context_basic(self) -> None: + pre = [b"line1", b"line2"] + ctx_line = b"line3" + post = [b"line4", b"line5"] + result = _format_context(pre, ctx_line, post, lineno=3) + assert result == [ + [1, "line1"], + [2, "line2"], + [3, "line3"], + [4, "line4"], + [5, "line5"], + ] + + def test_format_context_no_pre_or_post(self) -> None: + result = _format_context(None, b"only_line", None, lineno=1) + assert result == [[1, "only_line"]] + + def test_format_context_none_context_line(self) -> None: + result = _format_context(None, None, None, lineno=1) + assert result == [] + + def test_format_context_utf8_decode(self) -> None: + result = _format_context(None, "héllo".encode(), None, lineno=1) + assert result == [[1, "héllo"]] + + +class FetchSourceContextTest(TestCase): + def setUp(self) -> None: + super().setUp() + with assume_test_silo_mode(SiloMode.CONTROL): + self.integration, self.oi = self.create_provider_integration_for( + self.organization, self.user, provider="example", name="Example" + ) + + self.repo = self.create_repo( + project=self.project, + name="getsentry/sentry", + ) + self.repo.integration_id = self.integration.id + self.repo.provider = "example" + self.repo.save() + + self.code_mapping = self.create_code_mapping( + organization_integration=self.oi, + project=self.project, + repo=self.repo, + stack_root="src/", + source_root="src/", + ) + + def _make_ctx(self, **overrides: str | None) -> StacktraceLinkContext: + defaults: StacktraceLinkContext = { + "file": "src/app/main.py", + "filename": "src/app/main.py", + "platform": "python", + "abs_path": "src/app/main.py", + "commit_id": None, + "group_id": None, + "line_no": "10", + "module": None, + "package": None, + "sdk_name": None, + } + defaults.update(overrides) # type: ignore[typeddict-item] + return defaults + + def test_missing_line_number(self) -> None: + ctx = self._make_ctx(line_no=None) + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "missing_line_number" + assert result["context"] == [] + + def test_invalid_line_number(self) -> None: + ctx = self._make_ctx(line_no="abc") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "invalid_line_number" + assert result["context"] == [] + + def test_no_code_mapping_match(self) -> None: + ctx = self._make_ctx( + file="unknown/path.py", filename="unknown/path.py", abs_path="unknown/path.py" + ) + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "no_code_mapping_match" + assert result["context"] == [] + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_integration_not_found(self, mock_service: MagicMock) -> None: + mock_service.get_integration.return_value = None + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "no_code_mapping_match" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_successful_fetch(self, mock_service: MagicMock) -> None: + file_content = "\n".join([f"line{i}" for i in range(1, 20)]) + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + mock_install.get_stacktrace_link.return_value = "https://github.com/example" + + # Make mock pass isinstance check + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx(line_no="10") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + + assert result["error"] is None + assert len(result["context"]) == 11 # 5 pre + 1 context + 5 post + assert result["context"][5] == [10, "line10"] + assert result["source_url"] == "https://github.com/example" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_get_file_not_supported(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = NotImplementedError + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "get_file_not_supported" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_rate_limited(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiRateLimitedError("rate limited") + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "rate_limited" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_file_not_found(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiError("Not Found", code=404) + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "file_not_found" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_line_out_of_range(self, mock_service: MagicMock) -> None: + file_content = "line1\nline2\nline3" + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx(line_no="100") + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "line_out_of_range" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_get_client_exception(self, mock_service: MagicMock) -> None: + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_install.get_client.side_effect = Exception("identity not found") + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + ctx = self._make_ctx() + result = fetch_source_context_from_scm([self.code_mapping], ctx) + assert result["error"] == "integration_error" diff --git a/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py b/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py new file mode 100644 index 00000000000000..f7415941c12a83 --- /dev/null +++ b/tests/sentry/issues/endpoints/test_project_stacktrace_source_context.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from sentry.silo.base import SiloMode +from sentry.testutils.cases import APITestCase +from sentry.testutils.silo import assume_test_silo_mode + + +class ProjectStacktraceSourceContextTest(APITestCase): + endpoint = "sentry-api-0-project-stacktrace-source-context" + + def setUp(self) -> None: + super().setUp() + with assume_test_silo_mode(SiloMode.CONTROL): + self.integration, self.oi = self.create_provider_integration_for( + self.organization, self.user, provider="example", name="Example" + ) + + self.repo = self.create_repo( + project=self.project, + name="getsentry/sentry", + ) + self.repo.integration_id = self.integration.id + self.repo.provider = "example" + self.repo.save() + + self.code_mapping = self.create_code_mapping( + organization_integration=self.oi, + project=self.project, + repo=self.repo, + stack_root="src/", + source_root="src/", + ) + + self.login_as(self.user) + self.project.update_option("sentry:scm_source_context_enabled", True) + + def test_feature_flag_required(self) -> None: + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 404 + + def test_project_option_required(self) -> None: + self.project.update_option("sentry:scm_source_context_enabled", False) + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 404 + + def test_missing_filepath(self) -> None: + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"lineNo": "10", "platform": "python"}, + ) + assert response.status_code == 400 + + def test_missing_lineno(self) -> None: + with self.feature("organizations:scm-source-context"): + response = self.get_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "platform": "python"}, + ) + assert response.status_code == 400 + + def test_no_code_mappings(self) -> None: + self.code_mapping.delete() + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={"file": "src/main.py", "lineNo": "10", "platform": "python"}, + ) + assert response.data["error"] == "no_code_mappings_for_project" + assert response.data["context"] == [] + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_successful_fetch(self, mock_service: MagicMock) -> None: + file_content = "\n".join([f"line{i}" for i in range(1, 20)]) + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.return_value = file_content + mock_install.get_stacktrace_link.return_value = "https://github.com/file" + + from sentry.integrations.source_code_management.repository import RepositoryIntegration + + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={ + "file": "src/app/main.py", + "lineNo": "10", + "platform": "python", + }, + ) + assert response.data["error"] is None + assert len(response.data["context"]) == 11 + assert response.data["context"][5] == [10, "line10"] + assert response.data["sourceUrl"] == "https://github.com/file" + + @patch("sentry.integrations.utils.source_context.integration_service") + def test_file_not_found(self, mock_service: MagicMock) -> None: + from sentry.integrations.source_code_management.repository import RepositoryIntegration + from sentry.shared_integrations.exceptions import ApiError + + mock_integration = MagicMock() + mock_service.get_integration.return_value = mock_integration + mock_install = MagicMock() + mock_integration.get_installation.return_value = mock_install + mock_install.__class__ = RepositoryIntegration # type: ignore[assignment] + mock_client = MagicMock() + mock_install.get_client.return_value = mock_client + mock_client.get_file.side_effect = ApiError("Not Found", code=404) + + with self.feature("organizations:scm-source-context"): + response = self.get_success_response( + self.organization.slug, + self.project.slug, + qs_params={ + "file": "src/app/main.py", + "lineNo": "10", + "platform": "python", + }, + ) + assert response.data["error"] == "file_not_found" + assert response.data["context"] == [] diff --git a/tests/sentry/middleware/test_proxy.py b/tests/sentry/middleware/test_proxy.py index 09043bbe2b5838..57406f423b0def 100644 --- a/tests/sentry/middleware/test_proxy.py +++ b/tests/sentry/middleware/test_proxy.py @@ -1,6 +1,8 @@ from __future__ import annotations +import asyncio from functools import cached_property +from unittest.mock import patch from django.http import HttpRequest @@ -8,6 +10,7 @@ from sentry.models.team import Team from sentry.silo.base import SiloMode from sentry.testutils.cases import APITestCase, TestCase +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import assume_test_silo_mode, control_silo_test from sentry.types.cell import Cell, RegionCategory from sentry.utils import json @@ -48,6 +51,29 @@ class FakedAPIProxyTest(APITestCase): endpoint = "sentry-api-0-organization-teams" method = "post" + def setUp(self) -> None: + super().setUp() + + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + def _process_view_match(self, request, view_func, view_args, view_kwargs): + try: + asyncio.get_running_loop() + return self._process_view_inner(request, view_func, view_args, view_kwargs) + except RuntimeError: + return self._process_view_sync(request, view_func, view_args, view_kwargs) + + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_match", _process_view_match + ) + self._middleware_patch.start() + + def tearDown(self) -> None: + self._middleware_patch.stop() + super().tearDown() + def test_through_api_gateway(self) -> None: if SiloMode.get_current_mode() == SiloMode.MONOLITH: return @@ -62,7 +88,7 @@ def test_through_api_gateway(self) -> None: status_code=201, ) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) with assume_test_silo_mode(SiloMode.CELL): team = Team.objects.get(id=result["id"]) assert team.idp_provisioned diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index fb5eecffda7e43..d327fc4406c11c 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -1,14 +1,18 @@ import os from dataclasses import asdict +from unittest.mock import patch +import httpx import pytest import requests import zstandard from django.db import connections +from django.http import HttpResponse, StreamingHttpResponse from django.urls import reverse from objectstore_client import Client, RequestError, Session, Usecase from pytest_django.live_server_helper import LiveServer +from sentry.hybridcloud.apigateway_async import proxy as proxy_mod from sentry.silo.base import SiloMode, SingleProcessSiloModeState from sentry.testutils.asserts import assert_status_code from sentry.testutils.cases import TransactionTestCase @@ -180,7 +184,53 @@ def setUp(self) -> None: scope_list=["project:releases"], ) + #: some shenanigans to work around async/sync hell: + # - use a "one shot" httpx client, so that we're not bound previous + # no-more existing event loops + # - patch the middleware to consume original streamed response body + # before the loop gets closed/destroyed + class HTTPXOneShotClient: + def __init__(self): + self.inner = None + + def __getattr__(self, name): + return getattr(self.inner, name) + + def build_request(self, *args, **kwargs): + self.inner = httpx.AsyncClient() + return self.inner.build_request(*args, **kwargs) + + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + async def _eager_process_view_inner(mw_self, request, view_func, view_args, view_kwargs): + resp = await _original_middleware(mw_self, request, view_func, view_args, view_kwargs) + if isinstance(resp, StreamingHttpResponse) and resp.is_async: + body = b"" + async for chunk in resp: + body += chunk + sync_resp = HttpResponse( + content=body, + status=resp.status_code, + content_type=resp.get("Content-Type"), + ) + for header, value in resp.items(): + if header.lower() != "content-type": + sync_resp[header] = value + return sync_resp + return resp + + self._apigateway_patch = patch.object(proxy_mod, "proxy_client", HTTPXOneShotClient()) + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_inner", _eager_process_view_inner + ) + self._apigateway_patch.start() + self._middleware_patch.start() + def tearDown(self) -> None: + self._middleware_patch.stop() + self._apigateway_patch.stop() for conn in connections.all(): conn.close() super().tearDown() @@ -205,8 +255,6 @@ def test_health(self) -> None: follow=True, ) assert response.status_code == 200 - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_full_cycle(self) -> None: config = asdict(test_region) @@ -225,7 +273,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -234,8 +282,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved_data = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved_data == b"test data" + assert response.content == b"test data" response = self.client.put( f"{base_url}{object_key}", @@ -245,7 +292,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - new_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + new_key = json.loads(response.content)["key"] assert new_key == object_key response = self.client.get( @@ -254,8 +301,7 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == b"new data" + assert response.content == b"new data" response = self.client.delete( f"{base_url}{object_key}", @@ -263,8 +309,6 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 204) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] response = self.client.get( f"{base_url}{object_key}", @@ -272,8 +316,6 @@ def test_full_cycle(self) -> None: follow=True, ) assert_status_code(response, 404) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_roundtrip_compressed(self) -> None: config = asdict(test_region) @@ -297,7 +339,7 @@ def test_roundtrip_compressed(self) -> None: follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -306,5 +348,4 @@ def test_roundtrip_compressed(self) -> None: follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == data + assert response.content == data diff --git a/uv.lock b/uv.lock index 0c70205b032d8a..041cffa6e8c23d 100644 --- a/uv.lock +++ b/uv.lock @@ -796,6 +796,9 @@ pname = [ reload = [ { name = "watchfiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +uvloop = [ + { name = "uvloop", marker = "(platform_python_implementation == 'CPython' and sys_platform == 'darwin') or (platform_python_implementation == 'CPython' and sys_platform == 'linux')" }, +] [[package]] name = "grpc-google-iam-v1" @@ -949,17 +952,16 @@ http2 = [ [[package]] name = "httpx" -version = "0.25.2" +version = "0.28.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "httpcore", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "idna", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.25.2-py3-none-any.whl", hash = "sha256:a05d3d052d9b2dfce0e3896636467f8a5342fb2b902c819428e1ac65413ca118" }, + { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad" }, ] [[package]] @@ -2151,11 +2153,12 @@ dependencies = [ { name = "google-cloud-storage-transfer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-crc32c", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "googleapis-common-protos", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "granian", extra = ["pname", "reload"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "granian", extra = ["pname", "reload", "uvloop"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpc-google-iam-v1", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", version = "1.67.0", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux')" }, { name = "grpcio", version = "1.75.1", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version >= '3.14' and sys_platform == 'linux')" }, { name = "hiredis", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "iso3166", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "jsonschema", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "lxml", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2319,10 +2322,11 @@ requires-dist = [ { name = "google-cloud-storage-transfer", specifier = ">=1.17.0" }, { name = "google-crc32c", specifier = ">=1.6.0" }, { name = "googleapis-common-protos", specifier = ">=1.63.2" }, - { name = "granian", extras = ["pname", "reload"], specifier = ">=2.7" }, + { name = "granian", extras = ["pname", "reload", "uvloop"], specifier = ">=2.7" }, { name = "grpc-google-iam-v1", specifier = ">=0.13.1" }, { name = "grpcio", specifier = ">=1.67.0" }, { name = "hiredis", specifier = ">=2.3.2" }, + { name = "httpx", specifier = ">=0.28.1" }, { name = "iso3166", specifier = ">=2.1.1" }, { name = "jsonschema", specifier = ">=4.20.0" }, { name = "lxml", specifier = ">=5.3.0" }, @@ -3090,6 +3094,17 @@ socks = [ { name = "pysocks", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +[[package]] +name = "uvloop" +version = "0.21.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:bfd55dfcc2a512316e65f16e503e9e450cab148ef11df4e4e679b5e8253a5281" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:787ae31ad8a2856fc4e7c095341cccc7209bd657d0e71ad0dc2ea83c4a6fa8af" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5ee4d4ef48036ff6e5cfffb09dd192c7a5027153948d85b8da7ff705065bacc6" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3df876acd7ec037a3d005b3ab85a7e4110422e4d9c1571d4fc89b0fc41b6816" }, +] + [[package]] name = "virtualenv" version = "20.26.6"