From 9bfdfbbef65ebf1cc804ef39118061ccae7d885f Mon Sep 17 00:00:00 2001 From: Roberto Prevato Date: Tue, 24 Feb 2026 17:49:00 +0100 Subject: [PATCH] Add support for Auth Code Flow + PKCE --- CHANGELOG.md | 35 +++++++ blacksheep/server/authentication/oidc.py | 124 +++++++++++++++++++---- 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d29102..2a27a00e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ``` No application code change is required between environments. +- Add support for **PKCE** (Proof Key for Code Exchange, [RFC 7636](https://datatracker.ietf.org/doc/html/rfc7636)) to the OpenID Connect implementation. + - New `use_pkce: bool` setting on `OpenIDSettings`. When enabled, a `code_verifier` + is generated per sign-in request, stored inside the signed `state` parameter, and + the corresponding `code_challenge` (S256) is sent to the authorization endpoint. + The verifier is passed automatically in the token exchange request. + - New `response_mode: str | None` setting on `OpenIDSettings`, with automatic + detection via `get_response_mode()`: + - PKCE **with** `client_secret` (confidential client, recommended for web apps) → + `"form_post"` — the authorization code arrives via `POST` form data. This is the + same behaviour as the existing secret-only flow, with PKCE added as an extra layer + of protection, consistent with [OAuth 2.1](https://datatracker.ietf.org/doc/html/draft-ietf-oauth-v2-1-12) + best practices. + - PKCE **without** `client_secret` (public client, e.g. native/desktop apps) → + `"query"` — the authorization code arrives via a `GET` redirect with the code in + the query string (this scenario is not supported by some identity providers). + - The value can be overridden explicitly if a specific provider requires it. + - The callback route is registered as `GET` or `POST` automatically based on the + effective `response_mode`. + - New module-level helpers: `generate_pkce_code_verifier()` and + `generate_pkce_code_challenge()`, both exported from + `blacksheep.server.authentication.oidc`. + + **PKCE + secret (extra layer of security for server-side web apps):** + + ```python + use_openid_connect( + app, + OpenIDSettings( + client_id="...", + authority="https://login.microsoftonline.com/", + client_secret=Secret("..."), # confidential client + use_pkce=True, # adds code_challenge on top of the secret + ), + ) + ``` ## [2.6.1] - 2026-02-22 :cat: diff --git a/blacksheep/server/authentication/oidc.py b/blacksheep/server/authentication/oidc.py index 22f0808b..85a70c82 100644 --- a/blacksheep/server/authentication/oidc.py +++ b/blacksheep/server/authentication/oidc.py @@ -1,9 +1,13 @@ """ This module provides classes to handle OpenID Connect authentication through integration -with OAuth applications, supporting Authorization Code Grant and Hybrid flows. +with OAuth applications, supporting Authorization Code Grant, Authorization Code Grant +with PKCE, and Hybrid flows. """ +import base64 +import hashlib import logging +import secrets from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime @@ -47,6 +51,19 @@ def get_logger() -> logging.Logger: logger = get_logger() +def generate_pkce_code_verifier(length: int = 64) -> str: + """Generate a cryptographically random code_verifier for PKCE (RFC 7636).""" + if not (43 <= length <= 128): + raise ValueError("code_verifier length must be between 43 and 128 characters.") + return secrets.token_urlsafe(length)[:length] + + +def generate_pkce_code_challenge(code_verifier: str) -> str: + """Derive a code_challenge from the code_verifier using S256 (SHA-256).""" + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + + class OpenIDConfiguration: """ Proxy class for a remote OpenID Connect well-known configuration. @@ -109,11 +126,27 @@ class OpenIDSettings: scheme_name: str = "OpenIDConnect" error_redirect_path: str | None = None end_session_endpoint: str | None = None + use_pkce: bool = False + response_mode: str | None = None def __post_init__(self): if self.client_secret is not None and isinstance(self.client_secret, str): issue_deprecation_warning_for_secret_str() + def get_response_mode(self) -> str: + """Return the effective response_mode for the OIDC flow. + + - If explicitly set, use that value. + - PKCE without client_secret → "query" (required by providers like + Entra ID for public clients). + - Otherwise → "form_post". + """ + if self.response_mode: + return self.response_mode + if self.use_pkce and not self.client_secret: + return "query" + return "form_post" + def get_client_secret_value(self) -> str: if self.client_secret is None: raise ValueError("Missing client secret") @@ -175,17 +208,18 @@ def get_redirect_url(self, request: Request) -> str: return str(get_absolute_url_to_path(request, self._settings.callback_path)) def build_signin_parameters(self, request: Request): - if self._settings.client_secret: - # authorization code grant + if self._settings.use_pkce or self._settings.client_secret: + # authorization code grant (with or without PKCE) response_type = "code" else: # hybrid flow, requires implicit flow for id_token to be enabled response_type = "id_token" state = self.get_state(request) + response_mode = self._settings.get_response_mode() parameters = { "response_type": response_type, - "response_mode": "form_post", + "response_mode": response_mode, "scope": self.scope, "client_id": self._settings.client_id, "redirect_uri": self._settings.redirect_uri @@ -193,6 +227,14 @@ def build_signin_parameters(self, request: Request): "nonce": state.get("nonce") or generate_secret(8), } + if self._settings.use_pkce: + code_verifier = generate_pkce_code_verifier() + state["code_verifier"] = code_verifier + parameters["code_challenge"] = generate_pkce_code_challenge( + code_verifier + ) + parameters["code_challenge_method"] = "S256" + if self._settings.audience: # Note: Auth0 and Okta use `audience` parameter when # the scope includes custom scopes @@ -207,19 +249,32 @@ def _require_secret(self): if not self._settings.client_secret: raise MissingClientSecretSettingError() - def build_code_grant_parameters(self, request: Request, code: str): - self._require_secret() + def build_code_grant_parameters( + self, + request: Request, + code: str, + code_verifier: str | None = None, + ): + if not code_verifier: + self._require_secret() - return { + params: dict[str, str] = { "grant_type": "authorization_code", "code": code, "scope": self.scope, "redirect_uri": self._settings.redirect_uri or self.get_redirect_url(request), "client_id": self._settings.client_id, - "client_secret": self._settings.get_client_secret_value(), } + if code_verifier: + params["code_verifier"] = code_verifier + + if self._settings.client_secret: + params["client_secret"] = self._settings.get_client_secret_value() + + return params + def build_refresh_token_parameters(self, refresh_token: str): self._require_secret() @@ -456,7 +511,7 @@ async def _set_tokens_in_response( request: Request, response: Response, id_token: IDToken, - token_response: TokenResponse, + token_response: TokenResponse | None, ): self.auth_handler.set_cookie( id_token.data, response, secure=request.scheme == "https" @@ -484,7 +539,7 @@ async def get_logout_response( await self.tokens_store.unset_tokens(request) return response - async def authenticate(self, context: Request) -> Identity | None: + async def authenticate(self, context: Request) -> Identity | None: # type: ignore await self.auth_handler.authenticate(context) if self.tokens_store: @@ -859,7 +914,15 @@ async def handle_auth_redirect(self, request: Request) -> Response: Handles the redirect after the user interacted with the sign-in page of a remote authorization server. """ - data = await request.form() + if self._settings.get_response_mode() == "query": + # Authorization code delivered via query string (GET redirect) + raw_query = request.query + data: dict[str, Any] = { + k: v[0] if len(v) == 1 else v for k, v in raw_query.items() + } + else: + # Authorization code delivered via form POST (form_post) + data = await request.form() # type: ignore[assignment] if data is None or not data: return accepted() @@ -868,7 +931,7 @@ async def handle_auth_redirect(self, request: Request) -> Response: if error: logger.error( - "Received a post request with error message to the OIDC authorization " + "Received a request with error message to the OIDC authorization " "callback endpoint: %s", error, ) @@ -889,14 +952,23 @@ async def handle_auth_redirect(self, request: Request) -> Response: id_token = data.get("id_token") token_response = None - if id_token and not settings.client_secret: + # Extract code_verifier from state when PKCE is in use + code_verifier = ( + state.get("code_verifier") + if isinstance(state, dict) and settings.use_pkce + else None + ) + + if id_token and not settings.client_secret and not settings.use_pkce: logger.debug("Successfully obtained an id_token for a user.") else: code = data.get("code") - if settings.client_secret and isinstance(code, str): - # extra call to fetch an access token - token_response = await self.exchange_token(request, code) + if (settings.client_secret or code_verifier) and isinstance(code, str): + # extra call to fetch an access token (with secret or PKCE verifier) + token_response = await self.exchange_token( + request, code, code_verifier=code_verifier + ) await self.events.on_tokens_received.fire(token_response) @@ -929,11 +1001,16 @@ async def handle_auth_redirect(self, request: Request) -> Response: request, IDToken(id_token, parsed_id_token), token_response, redirect_path ) - async def exchange_token(self, request: Request, code: str) -> TokenResponse: + async def exchange_token( + self, + request: Request, + code: str, + code_verifier: str | None = None, + ) -> TokenResponse: configuration = await self.get_openid_configuration() code_grant_parameters = self.parameters_builder.build_code_grant_parameters( - request, code + request, code, code_verifier=code_verifier ) try: @@ -943,8 +1020,9 @@ async def exchange_token(self, request: Request, code: str) -> TokenResponse: except FailedRequestError as request_error: logger.error( "Failed to exchange an authorization code with an access token. " - "Inspect the exception details for more details on the cause of the " - "failure.", + "Response status: %s, body: %s", + request_error.status, + request_error.data, exc_info=request_error, ) raise OpenIDConnectFailedExchangeError(request_error) @@ -1095,8 +1173,12 @@ def use_openid_connect( async def redirect_to_sign_in(request: Request): return await handler.redirect_to_sign_in(request) + _callback_method = ( + app.router.get if settings.get_response_mode() == "query" else app.router.post + ) + @allow_anonymous() - @app.router.post(settings.callback_path) + @_callback_method(settings.callback_path) @cache_control(no_cache=True, no_store=True) async def handle_auth_redirect(request: Request): return await handler.handle_auth_redirect(request)