From 00c77968671382b9f27f6e708016a22ed6877e71 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sun, 26 Oct 2025 21:52:13 +0000 Subject: [PATCH 01/12] Add intial implementation of device code flow --- requests_auth/__init__.py | 2 + requests_auth/_oauth2/device_code.py | 191 +++ tests/oauth2/device_code/__init__.py | 0 .../device_code/test_outh2_device_code.py | 1026 +++++++++++++++++ 4 files changed, 1219 insertions(+) create mode 100644 requests_auth/_oauth2/device_code.py create mode 100644 tests/oauth2/device_code/__init__.py create mode 100644 tests/oauth2/device_code/test_outh2_device_code.py diff --git a/requests_auth/__init__.py b/requests_auth/__init__.py index c19a25b..c9c9b27 100644 --- a/requests_auth/__init__.py +++ b/requests_auth/__init__.py @@ -16,6 +16,7 @@ OAuth2AuthorizationCodePKCE, OktaAuthorizationCodePKCE, ) +from requests_auth._oauth2.device_code import OAuth2DeviceCode from requests_auth._oauth2.client_credentials import ( OAuth2ClientCredentials, OktaClientCredentials, @@ -52,6 +53,7 @@ "DisplaySettings", "OAuth2AuthorizationCodePKCE", "OktaAuthorizationCodePKCE", + "OAuth2DeviceCode", "OAuth2Implicit", "OktaImplicit", "OktaImplicitIdToken", diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py new file mode 100644 index 0000000..ca04381 --- /dev/null +++ b/requests_auth/_oauth2/device_code.py @@ -0,0 +1,191 @@ +import time + +from hashlib import sha512 +from typing import cast + +import requests + +from requests_auth._errors import InvalidGrantRequest, TimeoutOccurred, GrantNotProvided +from requests_auth._authentication import SupportMultiAuth +from requests_auth._oauth2.common import ( + OAuth2, + request_new_grant_with_post, + _content_from_response, +) + + +class OAuth2DeviceCode(requests.auth.AuthBase, SupportMultiAuth): + """ + Device Code Grant + + Describes an OAuth 2 device code flow authentication. + More details can be found in https://datatracker.ietf.org/doc/html/rfc8628 + """ + + def __init__( + self, authorization_url: str, token_url: str, client_id: str, **kwargs + ) -> None: + """ + :param authorization_url: OAuth 2 authorization URL. + :param token_url: OAuth 2 token URL. + :param client_id: Resource owner username. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 1 minute by default. + :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need + to enter the code. If false or not supported, the device code will be returned. + :param header_name: Name of the header field used to send token. + Token will be sent in Authorization header field by default. + :param header_value: Format used to send the token value. + "{token}" must be present as it will be replaced by the actual token. + Token will be sent as "Bearer {token}" by default. + :param scope: Scope parameter sent to token URL as body. Can also be a list of scopes. Not sent by default. + :param token_field_name: Field name containing the token. access_token by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + self.authorization_url = authorization_url + if not self.authorization_url: + raise Exception("Authorization URL is mandatory.") + self.token_url = token_url + if not self.token_url: + raise Exception("Token URL is mandatory.") + self.client_id = client_id + if not self.client_id: + raise Exception("Client ID is mandatory.") + + self.header_name = kwargs.pop("header_name", None) or "Authorization" + self.header_value = kwargs.pop("header_value", None) or "Bearer {token}" + if "{token}" not in self.header_value: + raise Exception("header_value parameter must contains {token}.") + + self.token_field_name = kwargs.pop("token_field_name", None) or "access_token" + self.early_expiry = float(kwargs.pop("early_expiry", None) or 30.0) + self.prefer_complete_verification_url = kwargs.pop( + "prefer_complete_verification_url", False + ) + + # Time is expressed in seconds + self.timeout = int(kwargs.pop("timeout", None) or 60) + + self.session = kwargs.pop("session", None) or requests.Session() + self.session.timeout = self.timeout + + # As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 + self.authorization_data = {"client_id": self.client_id} + scope = kwargs.pop("scope", None) + if scope: + self.authorization_data["scope"] = ( + " ".join(scope) if isinstance(scope, list) else scope + ) + self.data = kwargs + self.state = sha512( + (self.authorization_url + self.token_url + self.client_id).encode( + "unicode_escape" + ) + ).hexdigest() + + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data = {"grant_type": "refresh_token"} + self.refresh_data.update(kwargs) + + def __call__(self, r: requests.Request) -> requests.Request: + token = OAuth2.token_cache.get_token( + key=self.state, + early_expiry=self.early_expiry, + on_missing_token=self.request_new_token, + on_expired_token=self.refresh_token, + ) + r.headers[self.header_name] = self.header_value.format(token=token) + return r + + def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: + # As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 + + authorization_response: requests.Response = self.session.post( + self.authorization_url, + data=self.authorization_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + if not authorization_response: + raise InvalidGrantRequest(authorization_response) + + response_data = authorization_response.json() + device_code = response_data["device_code"] + user_code = response_data["user_code"] + verification_uri = response_data["verification_uri"] + request_expires_in = response_data["expires_in"] + + verification_uri_complete = response_data.get("verification_uri_complete", None) + if ( + self.prefer_complete_verification_url + and verification_uri_complete is not None + ): + verification_uri = verification_uri_complete + + interval = response_data.get("interval", 5) + start_time = time.time() + print("Device Code login request:") + print( + f"Navigate to {verification_uri} on any device and enter the device code: {user_code}" + ) + + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": device_code, + "client_id": self.client_id, + } + while time.time() - start_time < request_expires_in: + token_response = self.session.post( + self.token_url, + data=token_request_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + if token_response.status_code == 200: + # User has authenticated, a token is returned + content = _content_from_response(token_response) + token = content.get("access_token", None) + if not token: + raise GrantNotProvided("access_token", content) + token_expires_in = content.get("expires_in", None) + return ( + ( + self.state, + cast(str, content.get("access_token")), + cast(int, token_expires_in), + cast(str, content.get("refresh_token")), + ) + if token_expires_in is not None + else (self.state, cast(str, content.get("access_token"))) + ) + if token_response.status_code == 400: + # User has not authenticated, or something has gone wrong. There are two expected errors we could + # receive here which are normal. + error_content = _content_from_response(token_response) + error_type = error_content.get("error", None) + if error_type == "authorization_pending": + time.sleep(interval) + continue + if error_type == "slow_down": + interval += 5 + time.sleep(interval) + continue + raise InvalidGrantRequest(token_response) + raise TimeoutOccurred(request_expires_in) + + def refresh_token(self, refresh_token: str) -> tuple[str, str, int, str]: + # As described in https://tools.ietf.org/html/rfc6749#section-6 + self.refresh_data["refresh_token"] = refresh_token + token, expires_in, refresh_token = request_new_grant_with_post( + self.token_url, + self.refresh_data, + self.token_field_name, + self.timeout, + self.session, + ) + return self.state, token, expires_in, refresh_token diff --git a/tests/oauth2/device_code/__init__.py b/tests/oauth2/device_code/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/oauth2/device_code/test_outh2_device_code.py b/tests/oauth2/device_code/test_outh2_device_code.py new file mode 100644 index 0000000..b100d4d --- /dev/null +++ b/tests/oauth2/device_code/test_outh2_device_code.py @@ -0,0 +1,1026 @@ +import json +import uuid +from typing import Literal, Any, Iterable +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import OAuth2DeviceCode, TimeoutOccurred +from requests_auth.testing import BrowserMock, browser_mock, token_cache # noqa: F401 + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + session=session, + ) + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_authorization_code_flow_get_code_is_sent_in_authorization_header_by_default( + token_cache, responses: RequestsMock, browser_mock: BrowserMock +): + auth = requests_auth.OAuth2AuthorizationCode( + "http://provide_code", "http://provide_access_token" + ) + tab = browser_mock.add_response( + opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", + reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", + ) + responses.post( + "http://provide_access_token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "example_parameter": "example_value", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "authorization_code", + "redirect_uri": "http://localhost:5000/", + "response_type": "code", + "code": "SplxlOBeZQQYbYS6WxSbIA", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + tab.assert_success() + + +def test_oauth2_authorization_code_flow_token_is_expired_after_30_seconds_by_default( + token_cache, responses: RequestsMock +): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request + token_cache._add_token( + key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + # Meaning a new one will be requested + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock, browser_mock: BrowserMock +): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_access_token", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + early_expiry=28, + ) + # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request + token_cache._add_token( + key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock, browser_mock: BrowserMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + # Setup auth handler + requests.get("http://authorized_only", auth=auth) + + # Response for refresh token grant + responses.post( + "http://provide_device_code", + json={ + "access_token": "rVR7Syg5bjZtZYjbZIW", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ) + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "http://provide_device_code", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ) + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.post( + "http://provide_device_code", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'read_data'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TokenResponder: + def __init__(self, response_mock: RequestsMock, token_url: str): + self._mock = response_mock + self._responses = [] + self._token_url = token_url + + def assert_all_calls(self): + assert len(self._responses) == 0 + + def _response_callback( + self, request: requests.PreparedRequest + ) -> tuple[int, dict[str, Any], str]: + try: + response_data = self._responses.pop(0) + status_code = response_data.pop("status") + except IndexError: + response_data = {} + status_code = 404 + return ( + status_code, + {"content-type": "application/json"}, + json.dumps(response_data), + ) + + def configure_responses( + self, + response_types: Iterable[ + Literal["authorization_pending"] | Literal["slow_down"] | Literal["ok"] + ], + ) -> None: + for response_type in response_types: + if response_type == "authorization_pending": + self._responses.append( + {"status": 400, "error": "authorization_pending"} + ) + elif response_type == "slow_down": + self._responses.append({"status": 400, "error": "slow_down"}) + elif response_type == "ok": + self._responses.append( + { + "status": 200, + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + } + ) + self._mock.add_callback("POST", self._token_url, self._response_callback) + + +class TestPollingBehaviour: + _authorization_url = "http://provide_code/" + _token_url = "http://provide_device_code/" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[OAuth2DeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=client_id, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + "http://provide_code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: OAuth2DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) + + +class TestInvalidTokenRequest: + _authorization_url = "http://provide_code" + _token_url = "http://provide_device_code" + _client_id = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> OAuth2DeviceCode: + return requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=self._client_id, + ) + + def _configure_authorization_response(self, response_mock: RequestsMock) -> None: + response_mock.post( + "http://provide_code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": self._client_id, + } + ), + ], + ) + + def _configure_token_error( + self, response_mock: RequestsMock, error_object: dict | str + ) -> None: + response_mock.post( + self._token_url, + json=error_object, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": self._device_code, + "client_id": self._client_id, + } + ), + ], + status=400, + ) + + def test_with_invalid_request( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_request"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: The request is missing a required parameter, includes an " + "unsupported parameter value (other than grant type), repeats a parameter, " + "includes multiple credentials, utilizes more than one mechanism for " + "authenticating the client, or is otherwise malformed." + ) + + def test_with_invalid_request_and_description( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + {"error": "invalid_request", "error_description": "desc of the error"}, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "invalid_request: desc of the error" + + def test_with_invalid_request_description_and_url( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + { + "error": "invalid_request", + "error_description": "desc of the error", + "error_uri": "http://test_url", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == f"invalid_request: desc of the error\nMore information can be found on http://test_url" + ) + + def test_with_invalid_request_description_url_and_other_fields( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error( + responses, + { + "error": "invalid_request", + "error_description": "desc of the error", + "error_uri": "http://test_url", + "other": "other info", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc of the error\nMore information can be found on http://test_url\nAdditional information: {'other': 'other info'}" + ) + + def test_with_no_error_field( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"other": "other info"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "{'other': 'other info'}" + + def test_with_invalid_client_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_client"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_client: Client authentication failed (e.g., unknown client, no " + "client authentication included, or unsupported authentication method). The " + "authorization server MAY return an HTTP 401 (Unauthorized) status code to " + "indicate which HTTP authentication schemes are supported. If the client " + 'attempted to authenticate via the "Authorization" request header field, the ' + "authorization server MUST respond with an HTTP 401 (Unauthorized) status " + 'code and include the "WWW-Authenticate" response header field matching the ' + "authentication scheme used by the client." + ) + + def test_with_invalid_grant_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_grant"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_grant: The provided authorization grant (e.g., authorization code, " + "resource owner credentials) or refresh token is invalid, expired, revoked, " + "does not match the redirection URI used in the authorization request, or was " + "issued to another client." + ) + + def test_with_unauthorized_client_error_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "unauthorized_client"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unauthorized_client: The authenticated client is not authorized to use this " + "authorization grant type." + ) + + def test_with_unsupported_grant_type_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "unsupported_grant_type"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unsupported_grant_type: The authorization grant type is not supported by the " + "authorization server." + ) + + def test_with_invalid_scope_returns_default_error_text( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authorization_response(responses) + self._configure_token_error(responses, {"error": "invalid_scope"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_scope: The requested scope is invalid, unknown, malformed, or " + "exceeds the scope granted by the resource owner." + ) + + +class TestInvalidAuthorizationRequest: + _authorization_url = "http://provide_code" + _token_url = "http://provide_device_code" + _client_id = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" + + @pytest.fixture(scope="function") + def auth(self) -> OAuth2DeviceCode: + return requests_auth.OAuth2DeviceCode( + self._authorization_url, + self._token_url, + client_id=self._client_id, + ) + + def _configure_authentication_error( + self, responses: RequestsMock, error_object: dict + ) -> None: + responses.post( + self._authorization_url, + json=error_object, + match=[ + urlencoded_params_matcher( + { + "client_id": self._client_id, + } + ), + ], + status=400, + ) + + def test_invalid_request_with_no_details_gives_default_error( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ) -> None: + self._configure_authentication_error(responses, {"error": "invalid_request"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: The request is missing a required parameter, includes an unsupported parameter value (other than grant type), repeats a parameter, includes multiple credentials, utilizes more than one mechanism for authenticating the client, or is otherwise malformed." + ) + + def test_invalid_request_with_error_description_gives_provided_description( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, {"error": "invalid_request", "error_description": "desc"} + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert str(exception_info.value) == "invalid_request: desc" + + def test_invalid_request_with_error_description_and_url_provides_description_and_url( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, + { + "error": "invalid_request", + "error_description": "desc", + "error_uri": "http://test_url", + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc\nMore information can be found on http://test_url" + ) + + def test_invalid_request_with_error_description_url_and_other_fields_provides_description_url_and_other_fields( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, + { + "error": "invalid_request", + "error_description": "desc", + "error_uri": "http://test_url", + "other": ["test"], + }, + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_request: desc\nMore information can be found on http://test_url\nAdditional information: {'other': ['test']}" + ) + + def test_unauthorized( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error( + responses, {"error": "unauthorized_client"} + ) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "unauthorized_client: The authenticated client is not authorized to use this authorization grant type." + ) + + def test_with_invalid_scope( + self, responses: RequestsMock, auth: requests_auth.OAuth2DeviceCode + ): + self._configure_authentication_error(responses, {"error": "invalid_scope"}) + + with pytest.raises(requests_auth.InvalidGrantRequest) as exception_info: + requests.get("http://authorized_only", auth=auth) + + assert ( + str(exception_info.value) + == "invalid_scope: The requested scope is invalid, unknown, malformed, or exceeds the scope granted by the resource owner." + ) + + +def test_authorization_url_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + assert str(exception_info.value) == "Authorization URL is mandatory." + + +def test_token_url_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", "", client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2" + ) + assert str(exception_info.value) == "Token URL is mandatory." + + +def test_client_id_is_mandatory(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", "http://provide_device_code", client_id="" + ) + assert str(exception_info.value) == "Client ID is mandatory." + + +def test_header_value_must_contains_token(): + with pytest.raises(Exception) as exception_info: + _ = requests_auth.OAuth2DeviceCode( + "http://provide_code", + "http://provide_device_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + header_value="Bearer token", + ) + assert str(exception_info.value) == "header_value parameter must contains {token}." From 042fa72a79450acf75778428189ef047960f2810 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sun, 28 Dec 2025 16:06:13 +0000 Subject: [PATCH 02/12] Add Auth0 device code class and update extra param handling --- requests_auth/__init__.py | 2 +- requests_auth/_oauth2/device_code.py | 54 +- .../test_oauth2_device_code_auth0.py | 568 ++++++++++++++++++ .../device_code/test_outh2_device_code.py | 59 +- 4 files changed, 631 insertions(+), 52 deletions(-) create mode 100644 tests/oauth2/device_code/test_oauth2_device_code_auth0.py diff --git a/requests_auth/__init__.py b/requests_auth/__init__.py index c9c9b27..21957f2 100644 --- a/requests_auth/__init__.py +++ b/requests_auth/__init__.py @@ -16,7 +16,7 @@ OAuth2AuthorizationCodePKCE, OktaAuthorizationCodePKCE, ) -from requests_auth._oauth2.device_code import OAuth2DeviceCode +from requests_auth._oauth2.device_code import OAuth2DeviceCode, Auth0DeviceCode from requests_auth._oauth2.client_credentials import ( OAuth2ClientCredentials, OktaClientCredentials, diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py index ca04381..a90bafe 100644 --- a/requests_auth/_oauth2/device_code.py +++ b/requests_auth/_oauth2/device_code.py @@ -40,6 +40,10 @@ def __init__( Token will be sent as "Bearer {token}" by default. :param scope: Scope parameter sent to token URL as body. Can also be a list of scopes. Not sent by default. :param token_field_name: Field name containing the token. access_token by default. + :param authorization_pending_status_code: Status code returned if server is waiting for authorization. Default + value is 400 "Bad Request". + :param slow_down_status_code: Status code returned if the server requests the client wait longer between poll + requests. Default value is 400 "Bad Request". :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. @@ -63,6 +67,10 @@ def __init__( raise Exception("header_value parameter must contains {token}.") self.token_field_name = kwargs.pop("token_field_name", None) or "access_token" + self.authorization_pending_status_code = kwargs.pop( + "authorization_pending_status_code", 400 + ) + self.slow_down_status_code = kwargs.pop("slow_down_status_code", 400) self.early_expiry = float(kwargs.pop("early_expiry", None) or 30.0) self.prefer_complete_verification_url = kwargs.pop( "prefer_complete_verification_url", False @@ -81,7 +89,7 @@ def __init__( self.authorization_data["scope"] = ( " ".join(scope) if isinstance(scope, list) else scope ) - self.data = kwargs + self.authorization_data.update(kwargs) self.state = sha512( (self.authorization_url + self.token_url + self.client_id).encode( "unicode_escape" @@ -92,6 +100,8 @@ def __init__( self.refresh_data = {"grant_type": "refresh_token"} self.refresh_data.update(kwargs) + self.additional_data = kwargs + def __call__(self, r: requests.Request) -> requests.Request: token = OAuth2.token_cache.get_token( key=self.state, @@ -139,6 +149,7 @@ def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: "device_code": device_code, "client_id": self.client_id, } + token_request_data.update(self.additional_data) while time.time() - start_time < request_expires_in: token_response = self.session.post( self.token_url, @@ -163,14 +174,15 @@ def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: if token_expires_in is not None else (self.state, cast(str, content.get("access_token"))) ) - if token_response.status_code == 400: - # User has not authenticated, or something has gone wrong. There are two expected errors we could - # receive here which are normal. + if token_response.status_code == self.authorization_pending_status_code: error_content = _content_from_response(token_response) error_type = error_content.get("error", None) if error_type == "authorization_pending": time.sleep(interval) continue + if token_response.status_code == self.slow_down_status_code: + error_content = _content_from_response(token_response) + error_type = error_content.get("error", None) if error_type == "slow_down": interval += 5 time.sleep(interval) @@ -189,3 +201,37 @@ def refresh_token(self, refresh_token: str) -> tuple[str, str, int, str]: self.session, ) return self.state, token, expires_in, refresh_token + + +class Auth0DeviceCode(OAuth2DeviceCode): + """ + Describes an Auth0 (OAuth 2) Device code flow authentication request. + """ + + def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None: + """ + :param domain: Auth0 domain, (like "https://org.eu.auth0.com") + :param client_id: Client ID + :param audience: API Audience, (like https://org-api-audience") + :param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 1 minute by default. + :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need + to enter the code. If false or not supported, the device code will be returned. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + stripped_domain = domain.rstrip("/") + super().__init__( + authorization_url=f"{stripped_domain}/oauth/device/code", + token_url=f"{stripped_domain}/oauth/token", + client_id=client_id, + audience=audience, + authorization_pending_status_code=403, + slow_down_status_code=403, + **kwargs, + ) diff --git a/tests/oauth2/device_code/test_oauth2_device_code_auth0.py b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py new file mode 100644 index 0000000..dc1486d --- /dev/null +++ b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py @@ -0,0 +1,568 @@ +import uuid +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import Auth0DeviceCode, TimeoutOccurred +from requests_auth.testing import token_cache # noqa: F401 +from tests.oauth2.device_code.test_outh2_device_code import TokenResponder + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + session=session, + ) + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( + token_cache, responses: RequestsMock +): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + # Meaning a new one will be requested + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock +): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + early_expiry=28, + ) + # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + # Setup auth handler + requests.get("http://authorized_only", auth=auth) + + # Response for refresh token grant + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "rVR7Syg5bjZtZYjbZIW", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "audience": "api-audience", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "http://provide_code/oauth/token", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "audience": "api-audience", + } + ), + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + match=[ + urlencoded_params_matcher( + { + "audience": "api-audience", + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.Auth0DeviceCode( + "http://provide_code", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + audience="api-audience", + ) + # Setup initial authentication responses + responses.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "audience": "api-audience", + } + ), + ], + ) + responses.post( + "http://provide_code/oauth/token", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "read_data", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'read_data'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TestPollingBehaviour: + _domain = "http://provide_code" + _authorization_url = f"{_domain}/oauth/device/code" + _token_url = f"{_domain}/oauth/token" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + _api_audience = "api-audience" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[Auth0DeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.Auth0DeviceCode( + domain=self._domain, + client_id=client_id, + audience=self._api_audience, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + "http://provide_code/oauth/device/code", + json={ + "device_code": self._device_code, + "user_code": "WDJB-MJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + "audience": "api-audience", + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: Auth0DeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url, status_code=403) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) diff --git a/tests/oauth2/device_code/test_outh2_device_code.py b/tests/oauth2/device_code/test_outh2_device_code.py index b100d4d..3d26b24 100644 --- a/tests/oauth2/device_code/test_outh2_device_code.py +++ b/tests/oauth2/device_code/test_outh2_device_code.py @@ -10,7 +10,7 @@ import requests_auth from requests_auth import OAuth2DeviceCode, TimeoutOccurred -from requests_auth.testing import BrowserMock, browser_mock, token_cache # noqa: F401 +from requests_auth.testing import token_cache # noqa: F401 DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" @@ -73,47 +73,7 @@ def test_oauth2_device_code_flow_uses_provided_session( requests.get("http://authorized_only", auth=auth) -def test_oauth2_authorization_code_flow_get_code_is_sent_in_authorization_header_by_default( - token_cache, responses: RequestsMock, browser_mock: BrowserMock -): - auth = requests_auth.OAuth2AuthorizationCode( - "http://provide_code", "http://provide_access_token" - ) - tab = browser_mock.add_response( - opened_url="http://provide_code?response_type=code&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2F", - reply_url="http://localhost:5000#code=SplxlOBeZQQYbYS6WxSbIA&state=163f0455b3e9cad3ca04254e5a0169553100d3aa0756c7964d897da316a695ffed5b4f46ef305094fd0a88cfe4b55ff257652015e4aa8f87b97513dba440f8de", - ) - responses.post( - "http://provide_access_token", - json={ - "access_token": "2YotnFZFEjr1zCsicMWpAA", - "token_type": "example", - "expires_in": 3600, - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "example_parameter": "example_value", - }, - match=[ - urlencoded_params_matcher( - { - "grant_type": "authorization_code", - "redirect_uri": "http://localhost:5000/", - "response_type": "code", - "code": "SplxlOBeZQQYbYS6WxSbIA", - } - ), - ], - ) - responses.get( - "http://authorized_only", - match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], - ) - - requests.get("http://authorized_only", auth=auth) - - tab.assert_success() - - -def test_oauth2_authorization_code_flow_token_is_expired_after_30_seconds_by_default( +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( token_cache, responses: RequestsMock ): auth = requests_auth.OAuth2DeviceCode( @@ -174,7 +134,7 @@ def test_oauth2_authorization_code_flow_token_is_expired_after_30_seconds_by_def def test_oauth2_device_code_flow_token_custom_expiry( - token_cache, responses: RequestsMock, browser_mock: BrowserMock + token_cache, responses: RequestsMock ): auth = requests_auth.OAuth2DeviceCode( "http://provide_code", @@ -196,7 +156,7 @@ def test_oauth2_device_code_flow_token_custom_expiry( requests.get("http://authorized_only", auth=auth) -def test_refresh_token(token_cache, responses: RequestsMock, browser_mock: BrowserMock): +def test_refresh_token(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2DeviceCode( "http://provide_code", "http://provide_device_code", @@ -456,7 +416,10 @@ def test_empty_token_is_invalid(token_cache, responses: RequestsMock): class TokenResponder: - def __init__(self, response_mock: RequestsMock, token_url: str): + def __init__( + self, response_mock: RequestsMock, token_url: str, status_code: int = 400 + ): + self._status_code = status_code self._mock = response_mock self._responses = [] self._token_url = token_url @@ -488,10 +451,12 @@ def configure_responses( for response_type in response_types: if response_type == "authorization_pending": self._responses.append( - {"status": 400, "error": "authorization_pending"} + {"status": self._status_code, "error": "authorization_pending"} ) elif response_type == "slow_down": - self._responses.append({"status": 400, "error": "slow_down"}) + self._responses.append( + {"status": self._status_code, "error": "slow_down"} + ) elif response_type == "ok": self._responses.append( { From 684f96b7d0099c2e462fcf519b2adbcccdae306e Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:48:53 +0000 Subject: [PATCH 03/12] Update scopes handling and add prompt callback --- requests_auth/_oauth2/device_code.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py index a90bafe..94d8559 100644 --- a/requests_auth/_oauth2/device_code.py +++ b/requests_auth/_oauth2/device_code.py @@ -1,4 +1,5 @@ import time +import warnings from hashlib import sha512 from typing import cast @@ -13,6 +14,11 @@ _content_from_response, ) +def prompt_user_to_authenticate(verification_uri: str, user_code: str) -> None: + print("Device Code login request:") + print( + f"Navigate to {verification_uri} on any device and enter the device code: {user_code}" + ) class OAuth2DeviceCode(requests.auth.AuthBase, SupportMultiAuth): """ @@ -30,7 +36,7 @@ def __init__( :param token_url: OAuth 2 token URL. :param client_id: Resource owner username. :param timeout: Maximum amount of seconds to wait for a token to be received once requested. - Wait for 1 minute by default. + Wait for 3 minutes by default. :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need to enter the code. If false or not supported, the device code will be returned. :param header_name: Name of the header field used to send token. @@ -49,6 +55,9 @@ def __init__( reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. :param session: requests.Session instance that will be used to request the token. Use it to provide a custom proxying rule for instance. + :param prompt_callback: A function that will be called with the verification_uri and user_code as parameters + once the authorization request has been made. By default, a print statement will be used to prompt the + user to authenticate with their browser. :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. """ self.authorization_url = authorization_url @@ -76,8 +85,10 @@ def __init__( "prefer_complete_verification_url", False ) + self.prompt_callback = kwargs.pop("prompt_callback", None) or prompt_user_to_authenticate + # Time is expressed in seconds - self.timeout = int(kwargs.pop("timeout", None) or 60) + self.timeout = int(kwargs.pop("timeout", None) or 180) self.session = kwargs.pop("session", None) or requests.Session() self.session.timeout = self.timeout @@ -139,10 +150,8 @@ def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: interval = response_data.get("interval", 5) start_time = time.time() - print("Device Code login request:") - print( - f"Navigate to {verification_uri} on any device and enter the device code: {user_code}" - ) + + self.prompt_callback(verification_uri, user_code) token_request_data = { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", @@ -215,7 +224,7 @@ def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None :param audience: API Audience, (like https://org-api-audience") :param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default. :param timeout: Maximum amount of seconds to wait for a token to be received once requested. - Wait for 1 minute by default. + Wait for 3 minutes by default. :param prefer_complete_verification_url: If supported, return the complete verification URL to avoid the need to enter the code. If false or not supported, the device code will be returned. :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. @@ -226,6 +235,8 @@ def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. """ stripped_domain = domain.rstrip("/") + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes super().__init__( authorization_url=f"{stripped_domain}/oauth/device/code", token_url=f"{stripped_domain}/oauth/token", From 2945cbb6ec6c830c2923b4c588675bdc6a4a8efc Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:49:29 +0000 Subject: [PATCH 04/12] Add EntraID Device Code Flow --- requests_auth/__init__.py | 4 +++- requests_auth/_oauth2/device_code.py | 32 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/requests_auth/__init__.py b/requests_auth/__init__.py index 21957f2..3c606dc 100644 --- a/requests_auth/__init__.py +++ b/requests_auth/__init__.py @@ -16,7 +16,7 @@ OAuth2AuthorizationCodePKCE, OktaAuthorizationCodePKCE, ) -from requests_auth._oauth2.device_code import OAuth2DeviceCode, Auth0DeviceCode +from requests_auth._oauth2.device_code import OAuth2DeviceCode, Auth0DeviceCode, EntraIDDeviceCode from requests_auth._oauth2.client_credentials import ( OAuth2ClientCredentials, OktaClientCredentials, @@ -54,6 +54,8 @@ "OAuth2AuthorizationCodePKCE", "OktaAuthorizationCodePKCE", "OAuth2DeviceCode", + "Auth0DeviceCode", + "EntraIDDeviceCode", "OAuth2Implicit", "OktaImplicit", "OktaImplicitIdToken", diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py index 94d8559..dc0348e 100644 --- a/requests_auth/_oauth2/device_code.py +++ b/requests_auth/_oauth2/device_code.py @@ -246,3 +246,35 @@ def __init__(self, domain: str, client_id: str, audience: str, **kwargs) -> None slow_down_status_code=403, **kwargs, ) + +class EntraIDDeviceCode(OAuth2DeviceCode): + """ + Describes an Entra ID (OAuth 2) Device code flow authentication request. + """ + + def __init__(self, tenant_id: str, client_id: str, **kwargs) -> None: + """ + :param tenant_id: Entra ID tenant ID, (like "00000000-0000-0000-0000-000000000000") + :param client_id: Client ID + :param scope: Scope parameter sent in query. Can also be a list of scopes. Request 'openid' by default. + :param timeout: Maximum amount of seconds to wait for a token to be received once requested. + Wait for 3 minutes by default. + :param early_expiry: Number of seconds before actual token expiry where token will be considered as expired. + Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request + reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. + :param session: requests.Session instance that will be used to request the token. + Use it to provide a custom proxying rule for instance. + :param kwargs: all additional authorization parameters that should be put as query parameter in the token URL. + """ + if "prefer_complete_verification_url" in kwargs: + warnings.warn("prefer_complete_verification_url parameter is not supported by Microsoft Entra ID and will be ignored.") + scopes = kwargs.pop("scope", "openid") + kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes + super().__init__( + authorization_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/devicecode", + token_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", + client_id=client_id, + authorization_pending_status_code=400, + slow_down_status_code=400, + **kwargs, + ) \ No newline at end of file From ea21140b9e571aae432ca8752508b9ccdd0d3941 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:49:46 +0000 Subject: [PATCH 05/12] Update scopes handling in Auth0 --- .../test_oauth2_device_code_auth0.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tests/oauth2/device_code/test_oauth2_device_code_auth0.py b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py index dc1486d..04ba1a5 100644 --- a/tests/oauth2/device_code/test_oauth2_device_code_auth0.py +++ b/tests/oauth2/device_code/test_oauth2_device_code_auth0.py @@ -40,6 +40,7 @@ def test_oauth2_device_code_flow_uses_provided_session( { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), header_matcher({"x-test": "Test value"}), @@ -104,6 +105,7 @@ def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), ], @@ -181,6 +183,7 @@ def test_refresh_token(token_cache, responses: RequestsMock): { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), ], @@ -192,7 +195,7 @@ def test_refresh_token(token_cache, responses: RequestsMock): "token_type": "example", "expires_in": 0, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", + "scope": "openid", }, match=[ urlencoded_params_matcher( @@ -221,7 +224,7 @@ def test_refresh_token(token_cache, responses: RequestsMock): "token_type": "example", "expires_in": 3600, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", + "scope": "openid", }, match=[ urlencoded_params_matcher( @@ -263,6 +266,7 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), ], @@ -274,7 +278,7 @@ def test_refresh_token_invalid(token_cache, responses: RequestsMock): "token_type": "example", "expires_in": 0, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", + "scope": "openid", }, match=[ urlencoded_params_matcher( @@ -341,6 +345,7 @@ def test_refresh_token_access_token_not_expired(token_cache, responses: Requests { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), ], @@ -352,7 +357,7 @@ def test_refresh_token_access_token_not_expired(token_cache, responses: Requests "token_type": "example", "expires_in": 3600, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", + "scope": "openid", }, match=[ urlencoded_params_matcher( @@ -403,6 +408,7 @@ def test_empty_token_is_invalid(token_cache, responses: RequestsMock): { "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", "audience": "api-audience", + "scope": "openid", } ), ], @@ -414,14 +420,14 @@ def test_empty_token_is_invalid(token_cache, responses: RequestsMock): "token_type": "example", "expires_in": 3600, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", + "scope": "openid", }, ) with pytest.raises(requests_auth.GrantNotProvided) as exception_info: requests.get("http://authorized_only", auth=auth) assert ( str(exception_info.value) - == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'read_data'}." + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'openid'}." ) assert isinstance(exception_info.value, requests_auth.RequestsAuthException) assert isinstance(exception_info.value, requests.RequestException) @@ -468,6 +474,7 @@ def _configure_authorization_response( { "client_id": client_id, "audience": "api-audience", + "scope": "openid", } ), ], From 9eeb6dd8f25182cf07efd5de5e338254208cebc3 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:49:55 +0000 Subject: [PATCH 06/12] Add EntraID tests --- .../test_oauth2_device_code_entraid.py | 546 ++++++++++++++++++ 1 file changed, 546 insertions(+) create mode 100644 tests/oauth2/device_code/test_oauth2_device_code_entraid.py diff --git a/tests/oauth2/device_code/test_oauth2_device_code_entraid.py b/tests/oauth2/device_code/test_oauth2_device_code_entraid.py new file mode 100644 index 0000000..f88d2c8 --- /dev/null +++ b/tests/oauth2/device_code/test_oauth2_device_code_entraid.py @@ -0,0 +1,546 @@ +import uuid +from unittest.mock import patch, call + +from responses import RequestsMock +from responses.matchers import urlencoded_params_matcher, header_matcher +import pytest +import requests + +import requests_auth +from requests_auth import EntraIDDeviceCode, TimeoutOccurred +from requests_auth.testing import token_cache # noqa: F401 +from tests.oauth2.device_code.test_outh2_device_code import TokenResponder + +DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +def test_oauth2_device_code_flow_uses_provided_session( + token_cache, responses: RequestsMock +): + session = requests.Session() + session.headers.update({"x-test": "Test value"}) + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + session=session, + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid" + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + header_matcher({"x-test": "Test value"}), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( + token_cache, responses: RequestsMock +): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request + token_cache._add_token( + key="ace95e9cce00f1807eaea783519b8b6db1f04304ca716898830285e05001cbd4db0a9efba441f42b29cd763b7f023daeff773b03b568f591e6ebfde17ec2b7b6", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + # Meaning a new one will be requested + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_oauth2_device_code_flow_token_custom_expiry( + token_cache, responses: RequestsMock +): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + early_expiry=28, + ) + # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request + token_cache._add_token( + key="ba2535d60bf38bde662eba49add795cc707c5f247b36d198fc98bc2dd82b9f1736ce63a27442ee4761982d739ab67b7ad92efec3dd8b04f613766904a0ad7e20", + token="2YotnFZFEjr1zCsicMWpAA", + expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + # Setup auth handler + requests.get("http://authorized_only", auth=auth) + + # Response for refresh token grant + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "rVR7Syg5bjZtZYjbZIW", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer rVR7Syg5bjZtZYjbZIW"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 0, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # response for refresh token grant + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={"error": "invalid_request"}, + status=400, + match=[ + urlencoded_params_matcher( + { + "grant_type": "refresh_token", + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + } + ), + ], + ) + + # if refreshing the token fails, fallback to requesting a new token + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_refresh_token_access_token_not_expired(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "2YotnFZFEjr1zCsicMWpAA", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + match=[ + urlencoded_params_matcher( + { + "grant_type": DEVICE_CODE_GRANT, + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + } + ), + ], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + # expect Bearer token to remain the same + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + requests.get("http://authorized_only", auth=auth) + + +def test_empty_token_is_invalid(token_cache, responses: RequestsMock): + auth = requests_auth.EntraIDDeviceCode( + "6d655498-2fe7-413e-9443-4f108184be38", + client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + ) + # Setup initial authentication responses + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/devicecode", + json={ + "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": 1800, + "interval": 5, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", + "scope": "openid", + } + ), + ], + ) + responses.post( + "https://login.microsoftonline.com/6d655498-2fe7-413e-9443-4f108184be38/oauth2/v2.0/token", + json={ + "access_token": "", + "token_type": "example", + "expires_in": 3600, + "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", + "scope": "openid", + }, + ) + with pytest.raises(requests_auth.GrantNotProvided) as exception_info: + requests.get("http://authorized_only", auth=auth) + assert ( + str(exception_info.value) + == "access_token not provided within {'access_token': '', 'token_type': 'example', 'expires_in': 3600, 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA', 'scope': 'openid'}." + ) + assert isinstance(exception_info.value, requests_auth.RequestsAuthException) + assert isinstance(exception_info.value, requests.RequestException) + + +class TestPollingBehaviour: + _tenant_id = "6d655498-2fe7-413e-9443-4f108184be38" + _authorization_url = f"https://login.microsoftonline.com/{_tenant_id}/oauth2/v2.0/devicecode" + _token_url = f"https://login.microsoftonline.com/{_tenant_id}/oauth2/v2.0/token" + _device_code = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" + + @pytest.fixture(scope="function") + def auth(self) -> tuple[EntraIDDeviceCode, str]: + client_id = str(uuid.uuid4()) + return ( + requests_auth.EntraIDDeviceCode( + tenant_id=self._tenant_id, + client_id=client_id, + ), + client_id, + ) + + def _configure_authorization_response( + self, + response_mock: RequestsMock, + client_id: str, + expires_in: int, + interval: int, + ) -> None: + response_mock.post( + self._authorization_url, + json={ + "device_code": self._device_code, + "user_code": "WDJBMJHT", + "verification_uri": "https://example.com/device", + "expires_in": expires_in, + "interval": interval, + }, + match=[ + urlencoded_params_matcher( + { + "client_id": client_id, + "scope": "openid", + } + ), + ], + ) + + def assert_sleep_calls(self, patched_call, sleep_calls: tuple[int, ...]) -> None: + assert patched_call.call_count == len(sleep_calls) + patched_call.assert_has_calls([call(item) for item in sleep_calls]) + + def test_authorization_polls_expected_count( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending", "ok") + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 5)) + + def test_authorization_polls_expected_count_with_slow_down( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses(("authorization_pending", "slow_down", "ok")) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10)) + + def test_authorization_slow_down_persists( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 60, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + responder.configure_responses( + ( + "authorization_pending", + "slow_down", + "authorization_pending", + "authorization_pending", + "ok", + ) + ) + + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + ) + + with patch("time.sleep", return_value=None) as patched_sleep: + requests.get("http://authorized_only", auth=device_code_auth) + + responder.assert_all_calls() + self.assert_sleep_calls(patched_sleep, (5, 10, 10, 10)) + + def test_authorization_times_out( + self, responses: RequestsMock, auth: EntraIDDeviceCode + ) -> None: + device_code_auth, client_id = auth + self._configure_authorization_response(responses, client_id, 9, 5) + responder = TokenResponder(responses, self._token_url, status_code=400) + + # Two authorization_pending with 5s expiry + responder.configure_responses( + ("authorization_pending", "authorization_pending") + ) + + with pytest.raises( + TimeoutOccurred, + match="User authentication was not received within 9 seconds.", + ): + requests.get("http://authorized_only", auth=device_code_auth) From c4342c33952bd702a8a0def7ad47da174641414b Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:50:08 +0000 Subject: [PATCH 07/12] Refactor device code tests --- .../device_code/test_outh2_device_code.py | 308 ++++++++++-------- 1 file changed, 168 insertions(+), 140 deletions(-) diff --git a/tests/oauth2/device_code/test_outh2_device_code.py b/tests/oauth2/device_code/test_outh2_device_code.py index 3d26b24..7e7d96b 100644 --- a/tests/oauth2/device_code/test_outh2_device_code.py +++ b/tests/oauth2/device_code/test_outh2_device_code.py @@ -14,6 +14,53 @@ DEVICE_CODE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" +# Common test constants +CLIENT_ID = "0d15afb1-2e83-487f-9d5a-fdd241d03db2" +DEVICE_CODE = "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS" +USER_CODE = "WDJB-MJHT" +VERIFICATION_URI = "https://example.com/device" +VERIFICATION_URI_COMPLETE = f"{VERIFICATION_URI}?user_code={USER_CODE}" +ACCESS_TOKEN = "2YotnFZFEjr1zCsicMWpAA" +REFRESH_TOKEN = "tGzv3JOkF0XG5Qx2TlKWIA" +TOKEN_TYPE = "example" +SCOPE = "read_data" +AUTHORIZATION_URL = "http://provide_code" +TOKEN_URL = "http://provide_device_code" + + +def make_authorization_response( + device_code=DEVICE_CODE, + user_code=USER_CODE, + verification_uri=VERIFICATION_URI, + verification_uri_complete=VERIFICATION_URI_COMPLETE, + expires_in=1800, + interval=5, +): + return { + "device_code": device_code, + "user_code": user_code, + "verification_uri": verification_uri, + "verification_uri_complete": verification_uri_complete, + "expires_in": expires_in, + "interval": interval, + } + + +def make_token_response( + access_token=ACCESS_TOKEN, + token_type=TOKEN_TYPE, + expires_in=3600, + refresh_token=REFRESH_TOKEN, + scope=SCOPE, +): + return { + "access_token": access_token, + "token_type": token_type, + "expires_in": expires_in, + "refresh_token": refresh_token, + "scope": scope, + } + def test_oauth2_device_code_flow_uses_provided_session( token_cache, responses: RequestsMock @@ -21,113 +68,68 @@ def test_oauth2_device_code_flow_uses_provided_session( session = requests.Session() session.headers.update({"x-test": "Test value"}) auth = requests_auth.OAuth2DeviceCode( - "http://provide_code", - "http://provide_device_code", - client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, session=session, ) responses.post( - "http://provide_code", - json={ - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "user_code": "WDJB-MJHT", - "verification_uri": "https://example.com/device", - "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", - "expires_in": 1800, - "interval": 5, - }, + AUTHORIZATION_URL, + json=make_authorization_response(), match=[ - urlencoded_params_matcher( - { - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), + urlencoded_params_matcher({"client_id": CLIENT_ID}), header_matcher({"x-test": "Test value"}), ], ) responses.post( - "http://provide_device_code", - json={ - "access_token": "2YotnFZFEjr1zCsicMWpAA", - "token_type": "example", - "expires_in": 3600, - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", - }, + TOKEN_URL, + json=make_token_response(), match=[ - urlencoded_params_matcher( - { - "grant_type": DEVICE_CODE_GRANT, - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), + urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + }), header_matcher({"x-test": "Test value"}), ], ) responses.get( "http://authorized_only", - match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], ) - requests.get("http://authorized_only", auth=auth) -def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default( - token_cache, responses: RequestsMock -): + + +def test_oauth2_device_code_flow_token_is_expired_after_30_seconds_by_default(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2DeviceCode( - "http://provide_code", - "http://provide_device_code", - client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, ) - # Add a token that expires in 29 seconds, so should be considered as expired when issuing the request token_cache._add_token( key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", - token="2YotnFZFEjr1zCsicMWpAA", + token=ACCESS_TOKEN, expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) - # Meaning a new one will be requested responses.post( - "http://provide_code", - json={ - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "user_code": "WDJB-MJHT", - "verification_uri": "https://example.com/device", - "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", - "expires_in": 1800, - "interval": 5, - }, - match=[ - urlencoded_params_matcher( - { - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), - ], + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], ) responses.post( - "http://provide_device_code", - json={ - "access_token": "2YotnFZFEjr1zCsicMWpAA", - "token_type": "example", - "expires_in": 3600, - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", - }, - match=[ - urlencoded_params_matcher( - { - "grant_type": DEVICE_CODE_GRANT, - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), - ], + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], ) responses.get( "http://authorized_only", - match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], ) requests.get("http://authorized_only", auth=auth) @@ -137,20 +139,19 @@ def test_oauth2_device_code_flow_token_custom_expiry( token_cache, responses: RequestsMock ): auth = requests_auth.OAuth2DeviceCode( - "http://provide_code", + AUTHORIZATION_URL, "http://provide_access_token", - client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + client_id=CLIENT_ID, early_expiry=28, ) - # Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request token_cache._add_token( key="5e668eeebaca3355b71f27143ebb2972f9bb6839844dc4ae0c519a1ac4e8fdb132c6ad864169e2b6e2aee9f317661fc5f3e7a2ee066284d468fb54c44bf29682", - token="2YotnFZFEjr1zCsicMWpAA", + token=ACCESS_TOKEN, expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29), ) responses.get( "http://authorized_only", - match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], ) requests.get("http://authorized_only", auth=auth) @@ -158,74 +159,36 @@ def test_oauth2_device_code_flow_token_custom_expiry( def test_refresh_token(token_cache, responses: RequestsMock): auth = requests_auth.OAuth2DeviceCode( - "http://provide_code", - "http://provide_device_code", - client_id="0d15afb1-2e83-487f-9d5a-fdd241d03db2", + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, ) - # Setup initial authentication responses responses.post( - "http://provide_code", - json={ - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "user_code": "WDJB-MJHT", - "verification_uri": "https://example.com/device", - "verification_uri_complete": "https://example.com/device?user_code=WDJB-MJHT", - "expires_in": 1800, - "interval": 5, - }, - match=[ - urlencoded_params_matcher( - { - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), - ], + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], ) responses.post( - "http://provide_device_code", - json={ - "access_token": "2YotnFZFEjr1zCsicMWpAA", - "token_type": "example", - "expires_in": 0, - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", - }, - match=[ - urlencoded_params_matcher( - { - "grant_type": DEVICE_CODE_GRANT, - "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", - "client_id": "0d15afb1-2e83-487f-9d5a-fdd241d03db2", - } - ), - ], + TOKEN_URL, + json=make_token_response(expires_in=0), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], ) responses.get( "http://authorized_only", - match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})], + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], ) - - # Setup auth handler requests.get("http://authorized_only", auth=auth) - - # Response for refresh token grant responses.post( - "http://provide_device_code", - json={ - "access_token": "rVR7Syg5bjZtZYjbZIW", - "token_type": "example", - "expires_in": 3600, - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - "scope": "read_data", - }, - match=[ - urlencoded_params_matcher( - { - "grant_type": "refresh_token", - "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", - } - ) - ], + TOKEN_URL, + json=make_token_response(access_token="rVR7Syg5bjZtZYjbZIW"), + match=[urlencoded_params_matcher({ + "grant_type": "refresh_token", + "refresh_token": REFRESH_TOKEN, + })], ) responses.get( "http://authorized_only", @@ -989,3 +952,68 @@ def test_header_value_must_contains_token(): header_value="Bearer token", ) assert str(exception_info.value) == "header_value parameter must contains {token}." + + +def test_oauth2_device_code_flow_prints_verification_url_and_user_code_to_stdout(token_cache, responses: RequestsMock, capsys): + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + captured = capsys.readouterr() + assert VERIFICATION_URI in captured.out + assert USER_CODE in captured.out + +def test_oauth2_device_code_flow_respects_prompt_callback(token_cache, responses: RequestsMock, capsys): + called = {} + def custom_prompt_callback(verification_uri, user_code): + called['uri'] = verification_uri + called['code'] = user_code + auth = requests_auth.OAuth2DeviceCode( + AUTHORIZATION_URL, + TOKEN_URL, + client_id=CLIENT_ID, + prompt_callback=custom_prompt_callback, + ) + responses.post( + AUTHORIZATION_URL, + json=make_authorization_response(), + match=[urlencoded_params_matcher({"client_id": CLIENT_ID})], + ) + responses.post( + TOKEN_URL, + json=make_token_response(), + match=[urlencoded_params_matcher({ + "grant_type": DEVICE_CODE_GRANT, + "device_code": DEVICE_CODE, + "client_id": CLIENT_ID, + })], + ) + responses.get( + "http://authorized_only", + match=[header_matcher({"Authorization": f"Bearer {ACCESS_TOKEN}"})], + ) + requests.get("http://authorized_only", auth=auth) + captured = capsys.readouterr() + assert called['uri'] == VERIFICATION_URI + assert called['code'] == USER_CODE + assert captured.out == "" From 66221f19421876da9e326470257c2351187dc6c2 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sat, 21 Feb 2026 17:59:17 +0000 Subject: [PATCH 08/12] Add delay in token request loop for device code flow --- requests_auth/_oauth2/device_code.py | 1 + 1 file changed, 1 insertion(+) diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py index dc0348e..be96cb2 100644 --- a/requests_auth/_oauth2/device_code.py +++ b/requests_auth/_oauth2/device_code.py @@ -159,6 +159,7 @@ def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: "client_id": self.client_id, } token_request_data.update(self.additional_data) + time.sleep(interval) while time.time() - start_time < request_expires_in: token_response = self.session.post( self.token_url, From 3fad71332506d88b8b60fdd1507ca8bda16d5662 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 12:32:24 +0000 Subject: [PATCH 09/12] Initial plan From 96fe254b555262e724e5bd27647a83f3dcadec12 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 12:33:41 +0000 Subject: [PATCH 10/12] Add device code flow changelog entry Co-authored-by: da1910 <12086549+da1910@users.noreply.github.com> --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b74556d..c10377c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `requests_auth.OAuth2DeviceCode` handling [OAuth 2.0 Device Authorization Grant](https://datatracker.ietf.org/doc/html/rfc8628) (device code flow). + The following parameters allow customisation of this flow: + - `prompt_callback`: A function called with `verification_uri` and `user_code` once the device authorization request has been made. By default a `print` statement prompts the user to navigate to the URI and enter the code. + - `prefer_complete_verification_url`: When `True` and the server provides a `verification_uri_complete`, that URL is used instead so the user does not need to enter the code separately. + - `authorization_pending_status_code` / `slow_down_status_code`: Status codes used to detect the `authorization_pending` and `slow_down` polling responses respectively. + - `timeout`: Maximum number of seconds to wait for the user to complete authentication. Defaults to 3 minutes. + - `early_expiry`: Number of seconds before actual token expiry where the token will be considered as expired. Defaults to 30 seconds. + - `scope`, `header_name`, `header_value`, `token_field_name` and `session` parameters are also available (consistent with other flows). +- `requests_auth.Auth0DeviceCode` provides out-of-the-box support for the [Auth0](https://auth0.com) device code flow. +- `requests_auth.EntraIDDeviceCode` provides out-of-the-box support for the [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code) device code flow. + ## [8.0.0] - 2024-06-18 ### Added - Adding explicit support for Python `3.12`. From 5ae4f5691acdf91003832f9d197e484465c1606a Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Mon, 2 Mar 2026 09:27:36 +0000 Subject: [PATCH 11/12] Update CHANGELOG.md --- CHANGELOG.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c10377c..8eeb194 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,10 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `prefer_complete_verification_url`: When `True` and the server provides a `verification_uri_complete`, that URL is used instead so the user does not need to enter the code separately. - `authorization_pending_status_code` / `slow_down_status_code`: Status codes used to detect the `authorization_pending` and `slow_down` polling responses respectively. - `timeout`: Maximum number of seconds to wait for the user to complete authentication. Defaults to 3 minutes. - - `early_expiry`: Number of seconds before actual token expiry where the token will be considered as expired. Defaults to 30 seconds. - - `scope`, `header_name`, `header_value`, `token_field_name` and `session` parameters are also available (consistent with other flows). -- `requests_auth.Auth0DeviceCode` provides out-of-the-box support for the [Auth0](https://auth0.com) device code flow. -- `requests_auth.EntraIDDeviceCode` provides out-of-the-box support for the [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code) device code flow. +- `requests_auth.Auth0DeviceCode` provides specific support for the [Auth0](https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow/call-your-api-using-the-device-authorization-flow) device code flow. +- `requests_auth.EntraIDDeviceCode` provides specific support for the [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/v2-oauth2-device-code) device code flow. ## [8.0.0] - 2024-06-18 ### Added From 0599d529c879c0b64f87bb69923f8b08a9c82f1f Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Mon, 2 Mar 2026 09:31:20 +0000 Subject: [PATCH 12/12] Use old Tuple syntax to support EOL python versions --- requests_auth/_oauth2/device_code.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/requests_auth/_oauth2/device_code.py b/requests_auth/_oauth2/device_code.py index be96cb2..c23fa5e 100644 --- a/requests_auth/_oauth2/device_code.py +++ b/requests_auth/_oauth2/device_code.py @@ -2,7 +2,7 @@ import warnings from hashlib import sha512 -from typing import cast +from typing import cast, Tuple import requests @@ -123,7 +123,7 @@ def __call__(self, r: requests.Request) -> requests.Request: r.headers[self.header_name] = self.header_value.format(token=token) return r - def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: + def request_new_token(self) -> Tuple[str, str] | Tuple[str, str, int, str]: # As described in https://datatracker.ietf.org/doc/html/rfc8628#section-3.1 authorization_response: requests.Response = self.session.post( @@ -200,7 +200,7 @@ def request_new_token(self) -> tuple[str, str] | tuple[str, str, int, str]: raise InvalidGrantRequest(token_response) raise TimeoutOccurred(request_expires_in) - def refresh_token(self, refresh_token: str) -> tuple[str, str, int, str]: + def refresh_token(self, refresh_token: str) -> Tuple[str, str, int, str]: # As described in https://tools.ietf.org/html/rfc6749#section-6 self.refresh_data["refresh_token"] = refresh_token token, expires_in, refresh_token = request_new_grant_with_post(