Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
```

No application code change is required between environments.
- Add support for **PKCE** (Proof Key for Code Exchange, [RFC 7636](https://datatracker.ietf.org/doc/html/rfc7636)) to the OpenID Connect implementation.
- New `use_pkce: bool` setting on `OpenIDSettings`. When enabled, a `code_verifier`
is generated per sign-in request, stored inside the signed `state` parameter, and
the corresponding `code_challenge` (S256) is sent to the authorization endpoint.
The verifier is passed automatically in the token exchange request.
- New `response_mode: str | None` setting on `OpenIDSettings`, with automatic
detection via `get_response_mode()`:
- PKCE **with** `client_secret` (confidential client, recommended for web apps) →
`"form_post"` — the authorization code arrives via `POST` form data. This is the
same behaviour as the existing secret-only flow, with PKCE added as an extra layer
of protection, consistent with [OAuth 2.1](https://datatracker.ietf.org/doc/html/draft-ietf-oauth-v2-1-12)
best practices.
- PKCE **without** `client_secret` (public client, e.g. native/desktop apps) →
`"query"` — the authorization code arrives via a `GET` redirect with the code in
the query string (this scenario is not supported by some identity providers).
- The value can be overridden explicitly if a specific provider requires it.
- The callback route is registered as `GET` or `POST` automatically based on the
effective `response_mode`.
- New module-level helpers: `generate_pkce_code_verifier()` and
`generate_pkce_code_challenge()`, both exported from
`blacksheep.server.authentication.oidc`.

**PKCE + secret (extra layer of security for server-side web apps):**

```python
use_openid_connect(
app,
OpenIDSettings(
client_id="...",
authority="https://login.microsoftonline.com/<tenant>",
client_secret=Secret("..."), # confidential client
use_pkce=True, # adds code_challenge on top of the secret
),
)
```

## [2.6.1] - 2026-02-22 :cat:

Expand Down
124 changes: 103 additions & 21 deletions blacksheep/server/authentication/oidc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""
This module provides classes to handle OpenID Connect authentication through integration
with OAuth applications, supporting Authorization Code Grant and Hybrid flows.
with OAuth applications, supporting Authorization Code Grant, Authorization Code Grant
with PKCE, and Hybrid flows.
"""

import base64
import hashlib
import logging
import secrets
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -47,6 +51,19 @@ def get_logger() -> logging.Logger:
logger = get_logger()


def generate_pkce_code_verifier(length: int = 64) -> str:
"""Generate a cryptographically random code_verifier for PKCE (RFC 7636)."""
if not (43 <= length <= 128):
raise ValueError("code_verifier length must be between 43 and 128 characters.")
return secrets.token_urlsafe(length)[:length]


def generate_pkce_code_challenge(code_verifier: str) -> str:
"""Derive a code_challenge from the code_verifier using S256 (SHA-256)."""
digest = hashlib.sha256(code_verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")


class OpenIDConfiguration:
"""
Proxy class for a remote OpenID Connect well-known configuration.
Expand Down Expand Up @@ -109,11 +126,27 @@ class OpenIDSettings:
scheme_name: str = "OpenIDConnect"
error_redirect_path: str | None = None
end_session_endpoint: str | None = None
use_pkce: bool = False
response_mode: str | None = None

def __post_init__(self):
if self.client_secret is not None and isinstance(self.client_secret, str):
issue_deprecation_warning_for_secret_str()

def get_response_mode(self) -> str:
"""Return the effective response_mode for the OIDC flow.

- If explicitly set, use that value.
- PKCE without client_secret → "query" (required by providers like
Entra ID for public clients).
- Otherwise → "form_post".
"""
if self.response_mode:
return self.response_mode
if self.use_pkce and not self.client_secret:
return "query"
return "form_post"

def get_client_secret_value(self) -> str:
if self.client_secret is None:
raise ValueError("Missing client secret")
Expand Down Expand Up @@ -175,24 +208,33 @@ def get_redirect_url(self, request: Request) -> str:
return str(get_absolute_url_to_path(request, self._settings.callback_path))

def build_signin_parameters(self, request: Request):
if self._settings.client_secret:
# authorization code grant
if self._settings.use_pkce or self._settings.client_secret:
# authorization code grant (with or without PKCE)
response_type = "code"
else:
# hybrid flow, requires implicit flow for id_token to be enabled
response_type = "id_token"

state = self.get_state(request)
response_mode = self._settings.get_response_mode()
parameters = {
"response_type": response_type,
"response_mode": "form_post",
"response_mode": response_mode,
"scope": self.scope,
"client_id": self._settings.client_id,
"redirect_uri": self._settings.redirect_uri
or self.get_redirect_url(request),
"nonce": state.get("nonce") or generate_secret(8),
}

if self._settings.use_pkce:
code_verifier = generate_pkce_code_verifier()
state["code_verifier"] = code_verifier
parameters["code_challenge"] = generate_pkce_code_challenge(
code_verifier
)
parameters["code_challenge_method"] = "S256"

if self._settings.audience:
# Note: Auth0 and Okta use `audience` parameter when
# the scope includes custom scopes
Expand All @@ -207,19 +249,32 @@ def _require_secret(self):
if not self._settings.client_secret:
raise MissingClientSecretSettingError()

def build_code_grant_parameters(self, request: Request, code: str):
self._require_secret()
def build_code_grant_parameters(
self,
request: Request,
code: str,
code_verifier: str | None = None,
):
if not code_verifier:
self._require_secret()

return {
params: dict[str, str] = {
"grant_type": "authorization_code",
"code": code,
"scope": self.scope,
"redirect_uri": self._settings.redirect_uri
or self.get_redirect_url(request),
"client_id": self._settings.client_id,
"client_secret": self._settings.get_client_secret_value(),
}

if code_verifier:
params["code_verifier"] = code_verifier

if self._settings.client_secret:
params["client_secret"] = self._settings.get_client_secret_value()

return params

def build_refresh_token_parameters(self, refresh_token: str):
self._require_secret()

Expand Down Expand Up @@ -456,7 +511,7 @@ async def _set_tokens_in_response(
request: Request,
response: Response,
id_token: IDToken,
token_response: TokenResponse,
token_response: TokenResponse | None,
):
self.auth_handler.set_cookie(
id_token.data, response, secure=request.scheme == "https"
Expand Down Expand Up @@ -484,7 +539,7 @@ async def get_logout_response(
await self.tokens_store.unset_tokens(request)
return response

async def authenticate(self, context: Request) -> Identity | None:
async def authenticate(self, context: Request) -> Identity | None: # type: ignore
await self.auth_handler.authenticate(context)

if self.tokens_store:
Expand Down Expand Up @@ -859,7 +914,15 @@ async def handle_auth_redirect(self, request: Request) -> Response:
Handles the redirect after the user interacted with the sign-in page of a remote
authorization server.
"""
data = await request.form()
if self._settings.get_response_mode() == "query":
# Authorization code delivered via query string (GET redirect)
raw_query = request.query
data: dict[str, Any] = {
k: v[0] if len(v) == 1 else v for k, v in raw_query.items()
}
else:
# Authorization code delivered via form POST (form_post)
data = await request.form() # type: ignore[assignment]

if data is None or not data:
return accepted()
Expand All @@ -868,7 +931,7 @@ async def handle_auth_redirect(self, request: Request) -> Response:

if error:
logger.error(
"Received a post request with error message to the OIDC authorization "
"Received a request with error message to the OIDC authorization "
"callback endpoint: %s",
error,
)
Expand All @@ -889,14 +952,23 @@ async def handle_auth_redirect(self, request: Request) -> Response:
id_token = data.get("id_token")
token_response = None

if id_token and not settings.client_secret:
# Extract code_verifier from state when PKCE is in use
code_verifier = (
state.get("code_verifier")
if isinstance(state, dict) and settings.use_pkce
else None
)

if id_token and not settings.client_secret and not settings.use_pkce:
logger.debug("Successfully obtained an id_token for a user.")
else:
code = data.get("code")

if settings.client_secret and isinstance(code, str):
# extra call to fetch an access token
token_response = await self.exchange_token(request, code)
if (settings.client_secret or code_verifier) and isinstance(code, str):
# extra call to fetch an access token (with secret or PKCE verifier)
token_response = await self.exchange_token(
request, code, code_verifier=code_verifier
)

await self.events.on_tokens_received.fire(token_response)

Expand Down Expand Up @@ -929,11 +1001,16 @@ async def handle_auth_redirect(self, request: Request) -> Response:
request, IDToken(id_token, parsed_id_token), token_response, redirect_path
)

async def exchange_token(self, request: Request, code: str) -> TokenResponse:
async def exchange_token(
self,
request: Request,
code: str,
code_verifier: str | None = None,
) -> TokenResponse:
configuration = await self.get_openid_configuration()

code_grant_parameters = self.parameters_builder.build_code_grant_parameters(
request, code
request, code, code_verifier=code_verifier
)

try:
Expand All @@ -943,8 +1020,9 @@ async def exchange_token(self, request: Request, code: str) -> TokenResponse:
except FailedRequestError as request_error:
logger.error(
"Failed to exchange an authorization code with an access token. "
"Inspect the exception details for more details on the cause of the "
"failure.",
"Response status: %s, body: %s",
request_error.status,
request_error.data,
exc_info=request_error,
)
raise OpenIDConnectFailedExchangeError(request_error)
Expand Down Expand Up @@ -1095,8 +1173,12 @@ def use_openid_connect(
async def redirect_to_sign_in(request: Request):
return await handler.redirect_to_sign_in(request)

_callback_method = (
app.router.get if settings.get_response_mode() == "query" else app.router.post
)

@allow_anonymous()
@app.router.post(settings.callback_path)
@_callback_method(settings.callback_path)
@cache_control(no_cache=True, no_store=True)
async def handle_auth_redirect(request: Request):
return await handler.handle_auth_redirect(request)
Expand Down
Loading