From 8993b6a515f53592bad33b45ede0591d7df23f3e Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 12 Jul 2022 20:14:00 +0100 Subject: [PATCH] Support optional iat grace period --- fastapi_cloudauth/base.py | 2 ++ fastapi_cloudauth/cognito.py | 2 ++ fastapi_cloudauth/verification.py | 4 +++- tests/test_verification.py | 30 ++++++++++++++++++++++++++++++ 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/fastapi_cloudauth/base.py b/fastapi_cloudauth/base.py index 052c79d..827ded5 100644 --- a/fastapi_cloudauth/base.py +++ b/fastapi_cloudauth/base.py @@ -193,6 +193,7 @@ def __init__( scope_name: Optional[List[str]] = None, scope_key: Optional[str] = None, auto_error: bool = True, + iat_grace_period: int = 0, op: Operator = Operator._all, extra: Optional[ExtraVerifier] = None, ): @@ -210,6 +211,7 @@ def __init__( op=op, scope_key=self._scope_key, auto_error=self.auto_error, + iat_grace_period=iat_grace_period, extra=extra, ) diff --git a/fastapi_cloudauth/cognito.py b/fastapi_cloudauth/cognito.py index 40653b0..0874e71 100644 --- a/fastapi_cloudauth/cognito.py +++ b/fastapi_cloudauth/cognito.py @@ -31,6 +31,7 @@ def __init__( client_id: str, scope_key: Optional[str] = "cognito:groups", auto_error: bool = True, + iat_grace_period: int = 0, ): url = f"https://cognito-idp.{region}.amazonaws.com/{userPoolId}/.well-known/jwks.json" jwks = JWKS(url=url) @@ -40,6 +41,7 @@ def __init__( issuer=f"https://cognito-idp.{region}.amazonaws.com/{userPoolId}", scope_key=scope_key, auto_error=auto_error, + iat_grace_period=iat_grace_period, extra=CognitoExtraVerifier( client_id=client_id, issuer=f"https://cognito-idp.{region}.amazonaws.com/{userPoolId}", diff --git a/fastapi_cloudauth/verification.py b/fastapi_cloudauth/verification.py index 4c51c9b..836dfa3 100644 --- a/fastapi_cloudauth/verification.py +++ b/fastapi_cloudauth/verification.py @@ -141,6 +141,7 @@ def __init__( audience: Optional[Union[str, List[str]]] = None, issuer: Optional[str] = None, auto_error: bool = True, + iat_grace_period: int = 0, *args: Any, extra: Optional[ExtraVerifier] = None, **kwargs: Any @@ -153,6 +154,7 @@ def __init__( self._extra_verifier = extra self._aud = audience self._iss = issuer + self._iat_grace_period = iat_grace_period @property def auto_error(self) -> bool: @@ -237,7 +239,7 @@ def _verify_claims(self, http_auth: HTTPAuthorizationCredentials) -> bool: if claims.get("iat"): iat = int(claims["iat"]) now = timegm(datetime.utcnow().utctimetuple()) - if now < iat: + if now < iat - self._iat_grace_period: if self.auto_error: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=NOT_VERIFIED diff --git a/tests/test_verification.py b/tests/test_verification.py index 8b8b0ef..706c5d8 100644 --- a/tests/test_verification.py +++ b/tests/test_verification.py @@ -381,3 +381,33 @@ def test_verify_token(): and e.detail == messages.NOT_AUTHENTICATED ) _assert_verifier_no_error(token, verifier_no_error) + + +@pytest.mark.unittest +def test_verify_token_with_iat_grace_period(): + verifier = JWKsVerifier(jwks=JWKS.null()) + verifier_no_error = JWKsVerifier(jwks=JWKS.null(), auto_error=False) + + # iat is slightly in the future + token = jwt.encode( + { + "sub": "dummy-ID", + "exp": datetime.utcnow() + timedelta(hours=10), + "iat": datetime.utcnow() + timedelta(seconds=1), + }, + "dummy_secret", + headers={"alg": "HS256", "typ": "JWT", "kid": "dummy-kid"}, + ) + e = _assert_verifier(token, verifier) + assert e.status_code == HTTP_401_UNAUTHORIZED and e.detail == messages.NOT_VERIFIED + _assert_verifier_no_error(token, verifier_no_error) + + verifier_with_grace = JWKsVerifier(jwks=JWKS.null(), iat_grace_period=1) + verifier_with_grace_no_error = JWKsVerifier(jwks=JWKS.null(), iat_grace_period=1, auto_error=False) + + # Correct + verifier_with_grace._verify_claims(HTTPAuthorizationCredentials(scheme="a", credentials=token)) + verified = verifier_with_grace_no_error._verify_claims( + HTTPAuthorizationCredentials(scheme="a", credentials=token) + ) + assert verified != False \ No newline at end of file