From 2013746209d66dcc7c43273939a1c993a3941f41 Mon Sep 17 00:00:00 2001 From: Gao Sun Date: Sat, 3 May 2025 15:58:12 -0700 Subject: [PATCH] refactor: polish sample --- samples/server/whoami.py | 45 +++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/samples/server/whoami.py b/samples/server/whoami.py index 5144052..80ac8e5 100644 --- a/samples/server/whoami.py +++ b/samples/server/whoami.py @@ -1,3 +1,11 @@ +""" +An FastMCP server that provides a "WhoAmI" tool to return the current user's information. + +This server is compatible with OpenID Connect (OIDC) providers and uses the `mcpauth` library +to handle authorization. Please check https://mcp-auth.dev/docs/tutorials/whoami for more +information on how to use this server. +""" + import os from typing import Any from mcp.server.fastmcp import FastMCP @@ -41,6 +49,11 @@ def whoami() -> dict[str, Any]: def verify_access_token(token: str) -> AuthInfo: + """ + Verifies the provided Bearer token by fetching user information from the authorization server. + If the token is valid, it returns an `AuthInfo` object containing the user's information. + """ + endpoint = auth_server_config.metadata.userinfo_endpoint if not endpoint: raise ValueError( @@ -50,21 +63,28 @@ def verify_access_token(token: str) -> AuthInfo: try: response = requests.get( endpoint, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}" + }, # Standard Bearer token header ) - response.raise_for_status() - json = response.json() + response.raise_for_status() # Ensure we raise an error for HTTP errors + json = response.json() # Parse the JSON response return AuthInfo( token=token, - subject=json.get("sub"), - issuer=auth_issuer, - claims=json, + subject=json.get( + "sub" + ), # 'sub' is a standard claim for the subject (user's ID) + issuer=auth_issuer, # Use the configured issuer + claims=json, # Include all claims (JSON fields) returned by the userinfo endpoint ) + # `AuthInfo` is a Pydantic model, so validation errors usually mean the response didn't match + # the expected structure except pydantic.ValidationError as e: raise MCPAuthTokenVerificationException( MCPAuthTokenVerificationExceptionCode.INVALID_TOKEN, cause=e, ) + # Handle other exceptions that may occur during the request except Exception as e: raise MCPAuthTokenVerificationException( MCPAuthTokenVerificationExceptionCode.TOKEN_VERIFICATION_FAILED, @@ -72,15 +92,12 @@ def verify_access_token(token: str) -> AuthInfo: ) +bearer_auth = Middleware(mcp_auth.bearer_auth_middleware(verify_access_token)) app = Starlette( routes=[ + # Add the metadata route (`/.well-known/oauth-authorization-server`) mcp_auth.metadata_route(), - Mount( - "/", - app=mcp.sse_app(), - middleware=[ - Middleware(mcp_auth.bearer_auth_middleware(verify_access_token)) - ], - ), - ] + # Protect the MCP server with the Bearer auth middleware + Mount("/", app=mcp.sse_app(), middleware=[bearer_auth]), + ], )