diff --git a/CHANGELOG.md b/CHANGELOG.md index b74556d..8eeb194 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `requests_auth.OAuth2DeviceCode` handling [OAuth 2.0 Device Authorization Grant](https://datatracker.ietf.org/doc/html/rfc8628) (device code flow). + The following parameters allow customisation of this flow: + - `prompt_callback`: A function called with `verification_uri` and `user_code` once the device authorization request has been made. By default a `print` statement prompts the user to navigate to the URI and enter the code. + - `prefer_complete_verification_url`: When `True` and the server provides a `verification_uri_complete`, that URL is used instead so the user does not need to enter the code separately. + - `authorization_pending_status_code` / `slow_down_status_code`: Status codes used to detect the `authorization_pending` and `slow_down` polling responses respectively. + - `timeout`: Maximum number of seconds to wait for the user to complete authentication. Defaults to 3 minutes. +- `requests_auth.Auth0DeviceCode` provides specific support for the [Auth0](https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow/call-your-api-using-the-device-authorization-flow) device code flow. +- `requests_auth.EntraIDDeviceCode` provides specific support for the [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code) device code flow. + ## [8.0.0] - 2024-06-18 ### Added - Adding explicit support for Python `3.12`. diff --git a/requests_auth/__init__.py b/requests_auth/__init__.py index c19a25b..3c606dc 100644 --- a/requests_auth/__init__.py +++ b/requests_auth/__init__.py @@ -16,6 +16,7 @@ OAuth2AuthorizationCodePKCE, OktaAuthorizationCodePKCE, ) +from requests_auth._oauth2.device_code import OAuth2DeviceCode, Auth0DeviceCode, EntraIDDeviceCode from requests_auth._oauth2.client_credentials import ( OAuth2ClientCredentials, OktaClientCredentials, @@ -52,6 +53,9 @@ "DisplaySettings", "OAuth2AuthorizationCodePKCE", "OktaAuthorizationCodePKCE", + "OAuth2DeviceCode", + "Auth0DeviceCode", + "EntraIDDeviceCode", "OAuth2Implicit", "OktaImplicit", "OktaImplicitIdToken", diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py new file mode 100644 index 0000000..c23fa5e --- /dev/null +++ b/requests_auth/_oauth2/device_code.py @@ -0,0 +1,281 @@ +import time +import warnings + +from hashlib import sha512 +from typing import cast, Tuple + +import requests + +from requests_auth._errors import InvalidGrantRequest, TimeoutOccurred, GrantNotProvided +from requests_auth._authentication import SupportMultiAuth +from requests_auth._oauth2.common import ( + OAuth2, + request_new_grant_with_post, + _content_from_response, +) + +def prompt_user_to_authenticate(verification_uri: str, user_code: str) -> None: + print("Device Code login request:") + print( + f"Navigate to {verification_uri} on any device and enter the device code: {user_code}" + ) + +class OAuth2DeviceCode(requests.auth.AuthBase, SupportMultiAuth): + """ + Device Code Grant + + Describes an OAuth 2 device code flow authentication. + More details can be found in https://datatracker.ietf.org/doc/html/rfc8628 + """ + + def __init__( + self, authorization_url: str, token_url: str, client_id: str, **kwargs + ) -> None: + """ + :param authorization_url: OAuth 2 authorization URL. + :param token_url: OAuth 2 token URL. + :param client_id: Resource owner username. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 3 minutes by default. + :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need + to enter the code. If false or not supported, the device code will be returned. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param scope: Scope parameter sent to token URL as body. Can also be a list of scopes. Not sent by default. + :param token_field_name: Field name containing the token. access_token by default. + :param authorization_pending_status_code: Status code returned if server is waiting for authorization. Default + value is 400 "Bad Request". + :param slow_down_status_code: Status code returned if the server requests the client wait longer between poll + requests. Default value is 400 "Bad Request". + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param prompt_callback: A function that will be called with the verification_uri and user_code as parameters + once the authorization request has been made. By default, a print statement will be used to prompt the + user to authenticate with their browser. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + self.authorization_url = authorization_url + if not self.authorization_url: + raise Exception("Authorization URL is mandatory.") + self.token_url = token_url + if not self.token_url: + raise Exception("Token URL is mandatory.") + self.client_id = client_id + if not self.client_id: + raise Exception("Client ID is mandatory.") + + self.header_name = kwargs.pop("header_name", None) or "Authorization" + self.header_value = kwargs.pop("header_value", None) or "Bearer {token}" + if "{token}" not in self.header_value: + raise Exception("header_value parameter must contains {token}.") + + self.token_field_name = kwargs.pop("token_field_name", None) or "access_token" + self.authorization_pending_status_code = kwargs.pop( + "authorization_pending_status_code", 400 + ) + self.slow_down_status_code = kwargs.pop("slow_down_status_code", 400) + self.early_expiry = float(kwargs.pop("early_expiry", None) or 30.0) + self.prefer_complete_verification_url = kwargs.pop( + "prefer_complete_verification_url", False + ) + + self.prompt_callback = kwargs.pop("prompt_callback", None) or prompt_user_to_authenticate + + # Time is expressed in seconds + self.timeout = int(kwargs.pop("timeout", None) or 180) + + self.session = kwargs.pop("session", None) or requests.Session() + self.session.timeout = self.timeout + + # As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 + self.authorization_data = {"client_id": self.client_id} + scope = kwargs.pop("scope", None) + if scope: + self.authorization_data["scope"] = ( + " ".join(scope) if isinstance(scope, list) else scope + ) + self.authorization_data.update(kwargs) + self.state = sha512( + (self.authorization_url + self.token_url + self.client_id).encode( + "unicode_escape" + ) + ).hexdigest() + + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data = {"grant_type": "refresh_token"} + self.refresh_data.update(kwargs) + + self.additional_data = kwargs + + def __call__(self, r: requests.Request) -> requests.Request: + token = OAuth2.token_cache.get_token( + key=self.state, + early_expiry=self.early_expiry, + on_missing_token=self.request_new_token, + on_expired_token=self.refresh_token, + ) + r.headers[self.header_name] = self.header_value.format(token=token) + return r + + def request_new_token(self) -> Tuple[str, str] | Tuple[str, str, int, str]: + # As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 + + authorization_response: requests.Response = self.session.post( + self.authorization_url, + data=self.authorization_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + if not authorization_response: + raise InvalidGrantRequest(authorization_response) + + response_data = authorization_response.json() + device_code = response_data["device_code"] + user_code = response_data["user_code"] + verification_uri = response_data["verification_uri"] + request_expires_in = response_data["expires_in"] + + verification_uri_complete = response_data.get("verification_uri_complete", None) + if ( + self.prefer_complete_verification_url + and verification_uri_complete is not None + ): + verification_uri = verification_uri_complete + + interval = response_data.get("interval", 5) + start_time = time.time() + + self.prompt_callback(verification_uri, user_code) + + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": device_code, + "client_id": self.client_id, + } + token_request_data.update(self.additional_data) + time.sleep(interval) + while time.time() - start_time < request_expires_in: + token_response = self.session.post( + self.token_url, + data=token_request_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + if token_response.status_code == 200: + # User has authenticated, a token is returned + content = _content_from_response(token_response) + token = content.get("access_token", None) + if not token: + raise GrantNotProvided("access_token", content) + token_expires_in = content.get("expires_in", None) + return ( + ( + self.state, + cast(str, content.get("access_token")), + cast(int, token_expires_in), + cast(str, content.get("refresh_token")), + ) + if token_expires_in is not None + else (self.state, cast(str, content.get("access_token"))) + ) + if token_response.status_code == self.authorization_pending_status_code: + error_content = _content_from_response(token_response) + error_type = error_content.get("error", None) + if error_type == "authorization_pending": + time.sleep(interval) + continue + if token_response.status_code == self.slow_down_status_code: + error_content = _content_from_response(token_response) + error_type = error_content.get("error", None) + if error_type == "slow_down": + interval += 5 + time.sleep(interval) + continue + raise InvalidGrantRequest(token_response) + raise TimeoutOccurred(request_expires_in) + + def refresh_token(self, refresh_token: str) -> Tuple[str, str, int, str]: + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data["refresh_token"] = refresh_token + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, + self.refresh_data, + self.token_field_name, + self.timeout, + self.session, + ) + return self.state, token, expires_in, refresh_token + + +class Auth0DeviceCode(OAuth2DeviceCode): + """ + Describes an Auth0 (OAuth 2) Device code flow authentication request. + """ + + def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None: + """ + :param domain: Auth0 domain, (like "https://org.eu.auth0.com") + :param client_id: Client ID + :param audience: API Audience, (like https://org-api-audience") + :param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 3 minutes by default. + :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need + to enter the code. If false or not supported, the device code will be returned. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + stripped_domain = domain.rstrip("/") + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes + super().__init__( + authorization_url=f"{stripped_domain}/oauth/device/code", + token_url=f"{stripped_domain}/oauth/token", + client_id=client_id, + audience=audience, + authorization_pending_status_code=403, + slow_down_status_code=403, + **kwargs, + ) + +class EntraIDDeviceCode(OAuth2DeviceCode): + """ + Describes an Entra ID (OAuth 2) Device code flow authentication request. + """ + + def __init__(self, tenant_id: str, client_id: str, **kwargs) -> None: + """ + :param tenant_id: Entra ID tenant ID, (like "00000000-0000-0000-0000-000000000000") + :param client_id: Client ID + :param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 3 minutes by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + if "prefer_complete_verification_url" in kwargs: + warnings.warn("prefer_complete_verification_url parameter is not supported by Microsoft Entra ID and will be ignored.") + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes + super().__init__( + authorization_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/devicecode", + token_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", + client_id=client_id, + authorization_pending_status_code=400, + slow_down_status_code=400, + **kwargs, + ) \ No newline at end of file diff --git a/tests/oauth2/device_code/__init__.py b/tests/oauth2/device_code/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/oauth2/device_code/test_oauth2_device_code_auth0.py b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py new file mode 100644 index 0000000..04ba1a5 --- /dev/null +++ b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py @@ -0,0 +1,575 @@ +import uuid +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import Auth0DeviceCode, TimeoutOccurred +from requests_auth.testing import token_cache # noqa: F401 +from tests.oauth2.device_code.test_outh2_device_code import TokenResponder + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + session=session, + ) + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( + token_cache, responses: RequestsMock +): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + # Meaning a new one will be requested + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock +): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + early_expiry=28, + ) + # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + # Setup auth handler + requests.get("http://authorized_only", auth=auth) + + # Response for refresh token grant + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "rVR7Syg5bjZtZYjbZIW", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "audience": "api-audience", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "http://provide_code/oauth/token", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "audience": "api-audience", + } + ), + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'openid'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TestPollingBehaviour: + _domain = "http://provide_code" + _authorization_url = f"{_domain}/oauth/device/code" + _token_url = f"{_domain}/oauth/token" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + _api_audience = "api-audience" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[Auth0DeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.Auth0DeviceCode( + domain=self._domain, + client_id=client_id, + audience=self._api_audience, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + "audience": "api-audience", + "scope": "openid", + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) diff --git a/tests/oauth2/device_code/test_oauth2_device_code_entraid.py b/tests/oauth2/device_code/test_oauth2_device_code_entraid.py new file mode 100644 index 0000000..f88d2c8 --- /dev/null +++ b/tests/oauth2/device_code/test_oauth2_device_code_entraid.py @@ -0,0 +1,546 @@ +import uuid +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import EntraIDDeviceCode, TimeoutOccurred +from requests_auth.testing import token_cache # noqa: F401 +from tests.oauth2.device_code.test_outh2_device_code import TokenResponder + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + session=session, + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid" + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( + token_cache, responses: RequestsMock +): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + # Meaning a new one will be requested + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock +): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + early_expiry=28, + ) + # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request + token_cache._add_token( + key="ba2535d60bf38bde662eba49add795cc707c5f247b36d198fc98bc2dd82b9f1736ce63a27442ee4761982d739ab67b7ad92efec3dd8b04f613766904a0ad7e20", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + # Setup auth handler + requests.get("http://authorized_only", auth=auth) + + # Response for refresh token grant + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "rVR7Syg5bjZtZYjbZIW", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ), + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'openid'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TestPollingBehaviour: + _tenant_id = "6d655498-2fe7-413e-9443-4f108184be38" + _authorization_url = f"https://login.microsoftonline.com/{_tenant_id}/oauth2/v2.0/devicecode" + _token_url = f"https://login.microsoftonline.com/{_tenant_id}/oauth2/v2.0/token" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[EntraIDDeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.EntraIDDeviceCode( + tenant_id=self._tenant_id, + client_id=client_id, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + self._authorization_url, + json={ + "device_code": self._device_code, + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + "scope": "openid", + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) diff --git a/tests/oauth2/device_code/test_outh2_device_code.py b/tests/oauth2/device_code/test_outh2_device_code.py new file mode 100644 index 0000000..7e7d96b --- /dev/null +++ b/tests/oauth2/device_code/test_outh2_device_code.py @@ -0,0 +1,1019 @@ +import json +import uuid +from typing import Literal, Any, Iterable +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import OAuth2DeviceCode, TimeoutOccurred +from requests_auth.testing import token_cache # noqa: F401 + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + +# Common test constants +CLIENT_ID = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" +DEVICE_CODE = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" +USER_CODE = "WDJB-MJHT" +VERIFICATION_URI = "https://example.com/device" +VERIFICATION_URI_COMPLETE = f"{VERIFICATION_URI}?user_code={USER_CODE}" +ACCESS_TOKEN = "2YotnFZFEjr1zCsicMWpAA" +REFRESH_TOKEN = "tGzv3JOkF0XG5Qx2TlKWIA" +TOKEN_TYPE = "example" +SCOPE = "read_data" +AUTHORIZATION_URL = "http://provide_code" +TOKEN_URL = "http://provide_device_code" + + +def make_authorization_response( + device_code=DEVICE_CODE, + user_code=USER_CODE, + verification_uri=VERIFICATION_URI, + verification_uri_complete=VERIFICATION_URI_COMPLETE, + expires_in=1800, + interval=5, +): + return { + "device_code": device_code, + "user_code": user_code, + "verification_uri": verification_uri, + "verification_uri_complete": verification_uri_complete, + "expires_in": expires_in, + "interval": interval, + } + + +def make_token_response( + access_token=ACCESS_TOKEN, + token_type=TOKEN_TYPE, + expires_in=3600, + refresh_token=REFRESH_TOKEN, + scope=SCOPE, +): + return { + "access_token": access_token, + "token_type": token_type, + "expires_in": expires_in, + "refresh_token": refresh_token, + "scope": scope, + } + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + session=session, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[ + urlencoded_params_matcher({"client_id": CLIENT_ID}), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[ + urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + }), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + + + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + ) + token_cache._add_token( + key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", + token=ACCESS_TOKEN, + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock +): + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + "http://provide_access_token", + client_id=CLIENT_ID, + early_expiry=28, + ) + token_cache._add_token( + key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", + token=ACCESS_TOKEN, + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(expires_in=0), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + responses.post( + TOKEN_URL, + json=make_token_response(access_token="rVR7Syg5bjZtZYjbZIW"), + match=[urlencoded_params_matcher({ + "grant_type": "refresh_token", + "refresh_token": REFRESH_TOKEN, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "http://provide_device_code", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ) + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'read_data'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TokenResponder: + def __init__( + self, response_mock: RequestsMock, token_url: str, status_code: int = 400 + ): + self._status_code = status_code + self._mock = response_mock + self._responses = [] + self._token_url = token_url + + def assert_all_calls(self): + assert len(self._responses) == 0 + + def _response_callback( + self, request: requests.PreparedRequest + ) -> tuple[int, dict[str, Any], str]: + try: + response_data = self._responses.pop(0) + status_code = response_data.pop("status") + except IndexError: + response_data = {} + status_code = 404 + return ( + status_code, + {"content-type": "application/json"}, + json.dumps(response_data), + ) + + def configure_responses( + self, + response_types: Iterable[ + Literal["authorization_pending"] | Literal["slow_down"] | Literal["ok"] + ], + ) -> None: + for response_type in response_types: + if response_type == "authorization_pending": + self._responses.append( + {"status": self._status_code, "error": "authorization_pending"} + ) + elif response_type == "slow_down": + self._responses.append( + {"status": self._status_code, "error": "slow_down"} + ) + elif response_type == "ok": + self._responses.append( + { + "status": 200, + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + } + ) + self._mock.add_callback("POST", self._token_url, self._response_callback) + + +class TestPollingBehaviour: + _authorization_url = "http://provide_code/" + _token_url = "http://provide_device_code/" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[OAuth2DeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=client_id, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + "http://provide_code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) + + +class TestInvalidTokenRequest: + _authorization_url = "http://provide_code" + _token_url = "http://provide_device_code" + _client_id = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> OAuth2DeviceCode: + return requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=self._client_id, + ) + + def _configure_authorization_response(self, response_mock: RequestsMock) -> None: + response_mock.post( + "http://provide_code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": self._client_id, + } + ), + ], + ) + + def _configure_token_error( + self, response_mock: RequestsMock, error_object: dict | str + ) -> None: + response_mock.post( + self._token_url, + json=error_object, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": self._device_code, + "client_id": self._client_id, + } + ), + ], + status=400, + ) + + def test_with_invalid_request( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_request"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: The request is missing a required parameter, includes an " + "unsupported parameter value (other than grant type), repeats a parameter, " + "includes multiple credentials, utilizes more than one mechanism for " + "authenticating the client, or is otherwise malformed." + ) + + def test_with_invalid_request_and_description( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + {"error": "invalid_request", "error_description": "desc of the error"}, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "invalid_request: desc of the error" + + def test_with_invalid_request_description_and_url( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + { + "error": "invalid_request", + "error_description": "desc of the error", + "error_uri": "http://test_url", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == f"invalid_request: desc of the error\nMore information can be found on http://test_url" + ) + + def test_with_invalid_request_description_url_and_other_fields( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + { + "error": "invalid_request", + "error_description": "desc of the error", + "error_uri": "http://test_url", + "other": "other info", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc of the error\nMore information can be found on http://test_url\nAdditional information: {'other': 'other info'}" + ) + + def test_with_no_error_field( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"other": "other info"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "{'other': 'other info'}" + + def test_with_invalid_client_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_client"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_client: Client authentication failed (e.g., unknown client, no " + "client authentication included, or unsupported authentication method). The " + "authorization server MAY return an HTTP 401 (Unauthorized) status code to " + "indicate which HTTP authentication schemes are supported. If the client " + 'attempted to authenticate via the "Authorization" request header field, the ' + "authorization server MUST respond with an HTTP 401 (Unauthorized) status " + 'code and include the "WWW-Authenticate" response header field matching the ' + "authentication scheme used by the client." + ) + + def test_with_invalid_grant_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_grant"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_grant: The provided authorization grant (e.g., authorization code, " + "resource owner credentials) or refresh token is invalid, expired, revoked, " + "does not match the redirection URI used in the authorization request, or was " + "issued to another client." + ) + + def test_with_unauthorized_client_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "unauthorized_client"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unauthorized_client: The authenticated client is not authorized to use this " + "authorization grant type." + ) + + def test_with_unsupported_grant_type_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "unsupported_grant_type"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unsupported_grant_type: The authorization grant type is not supported by the " + "authorization server." + ) + + def test_with_invalid_scope_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_scope"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_scope: The requested scope is invalid, unknown, malformed, or " + "exceeds the scope granted by the resource owner." + ) + + +class TestInvalidAuthorizationRequest: + _authorization_url = "http://provide_code" + _token_url = "http://provide_device_code" + _client_id = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" + + @pytest.fixture(scope="function") + def auth(self) -> OAuth2DeviceCode: + return requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=self._client_id, + ) + + def _configure_authentication_error( + self, responses: RequestsMock, error_object: dict + ) -> None: + responses.post( + self._authorization_url, + json=error_object, + match=[ + urlencoded_params_matcher( + { + "client_id": self._client_id, + } + ), + ], + status=400, + ) + + def test_invalid_request_with_no_details_gives_default_error( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ) -> None: + self._configure_authentication_error(responses, {"error": "invalid_request"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: The request is missing a required parameter, includes an unsupported parameter value (other than grant type), repeats a parameter, includes multiple credentials, utilizes more than one mechanism for authenticating the client, or is otherwise malformed." + ) + + def test_invalid_request_with_error_description_gives_provided_description( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, {"error": "invalid_request", "error_description": "desc"} + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "invalid_request: desc" + + def test_invalid_request_with_error_description_and_url_provides_description_and_url( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, + { + "error": "invalid_request", + "error_description": "desc", + "error_uri": "http://test_url", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc\nMore information can be found on http://test_url" + ) + + def test_invalid_request_with_error_description_url_and_other_fields_provides_description_url_and_other_fields( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, + { + "error": "invalid_request", + "error_description": "desc", + "error_uri": "http://test_url", + "other": ["test"], + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc\nMore information can be found on http://test_url\nAdditional information: {'other': ['test']}" + ) + + def test_unauthorized( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, {"error": "unauthorized_client"} + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unauthorized_client: The authenticated client is not authorized to use this authorization grant type." + ) + + def test_with_invalid_scope( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error(responses, {"error": "invalid_scope"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_scope: The requested scope is invalid, unknown, malformed, or exceeds the scope granted by the resource owner." + ) + + +def test_authorization_url_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + assert str(exception_info.value) == "Authorization URL is mandatory." + + +def test_token_url_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", "", client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2" + ) + assert str(exception_info.value) == "Token URL is mandatory." + + +def test_client_id_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", "http://provide_device_code", client_id="" + ) + assert str(exception_info.value) == "Client ID is mandatory." + + +def test_header_value_must_contains_token(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + header_value="Bearer token", + ) + assert str(exception_info.value) == "header_value parameter must contains {token}." + + +def test_oauth2_device_code_flow_prints_verification_url_and_user_code_to_stdout(token_cache, responses: RequestsMock, capsys): + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + captured = capsys.readouterr() + assert VERIFICATION_URI in captured.out + assert USER_CODE in captured.out + +def test_oauth2_device_code_flow_respects_prompt_callback(token_cache, responses: RequestsMock, capsys): + called = {} + def custom_prompt_callback(verification_uri, user_code): + called['uri'] = verification_uri + called['code'] = user_code + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + prompt_callback=custom_prompt_callback, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + captured = capsys.readouterr() + assert called['uri'] == VERIFICATION_URI + assert called['code'] == USER_CODE + assert captured.out == ""