Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions samples/server/whoami.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
"""
An FastMCP server that provides a "WhoAmI" tool to return the current user's information.

This server is compatible with OpenID Connect (OIDC) providers and uses the `mcpauth` library
to handle authorization. Please check https://mcp-auth.dev/docs/tutorials/whoami for more
information on how to use this server.
"""

import os
from typing import Any
from mcp.server.fastmcp import FastMCP
Expand Down Expand Up @@ -41,6 +49,11 @@ def whoami() -> dict[str, Any]:


def verify_access_token(token: str) -> AuthInfo:
"""
Verifies the provided Bearer token by fetching user information from the authorization server.
If the token is valid, it returns an `AuthInfo` object containing the user's information.
"""

endpoint = auth_server_config.metadata.userinfo_endpoint
if not endpoint:
raise ValueError(
Expand All @@ -50,37 +63,41 @@ def verify_access_token(token: str) -> AuthInfo:
try:
response = requests.get(
endpoint,
headers={"Authorization": f"Bearer {token}"},
headers={
"Authorization": f"Bearer {token}"
}, # Standard Bearer token header
)
response.raise_for_status()
json = response.json()
response.raise_for_status() # Ensure we raise an error for HTTP errors
json = response.json() # Parse the JSON response
return AuthInfo(
token=token,
subject=json.get("sub"),
issuer=auth_issuer,
claims=json,
subject=json.get(
"sub"
), # 'sub' is a standard claim for the subject (user's ID)
issuer=auth_issuer, # Use the configured issuer
claims=json, # Include all claims (JSON fields) returned by the userinfo endpoint
)
# `AuthInfo` is a Pydantic model, so validation errors usually mean the response didn't match
# the expected structure
except pydantic.ValidationError as e:
raise MCPAuthTokenVerificationException(
MCPAuthTokenVerificationExceptionCode.INVALID_TOKEN,
cause=e,
)
# Handle other exceptions that may occur during the request
except Exception as e:
raise MCPAuthTokenVerificationException(
MCPAuthTokenVerificationExceptionCode.TOKEN_VERIFICATION_FAILED,
cause=e,
)


bearer_auth = Middleware(mcp_auth.bearer_auth_middleware(verify_access_token))
app = Starlette(
routes=[
# Add the metadata route (`/.well-known/oauth-authorization-server`)
mcp_auth.metadata_route(),
Mount(
"/",
app=mcp.sse_app(),
middleware=[
Middleware(mcp_auth.bearer_auth_middleware(verify_access_token))
],
),
]
# Protect the MCP server with the Bearer auth middleware
Mount("/", app=mcp.sse_app(), middleware=[bearer_auth]),
],
)