-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoauth2.py
More file actions
65 lines (53 loc) · 2.35 KB
/
oauth2.py
File metadata and controls
65 lines (53 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time
from typing import Optional
import requests
class OAuth2Client:
"""OAuth 2.0 client for handling client credentials flow with token caching"""
def __init__(self, client_id: str, client_secret: str, access_token_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.access_token_uri = access_token_uri
self.access_token: Optional[str] = None
self.token_expires_at: Optional[float] = None
def _is_token_expired(self, buffer_seconds: int = 30) -> bool:
"""Check if the current token is expired or will expire soon"""
if not self.access_token or not self.token_expires_at:
return True
# Add buffer time to refresh token before it actually expires
return time.time() >= (self.token_expires_at - buffer_seconds)
def _request_new_token(self) -> str:
"""Request a new access token from the OAuth server"""
# Prepare token request
token_data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
response = requests.post(
self.access_token_uri,
data=token_data,
headers=headers,
timeout=30
)
response.raise_for_status()
token_response = response.json()
self.access_token = token_response.get('access_token')
# Calculate expiration time
expires_in = token_response.get('expires_in', 3600) # Default to 1 hour
self.token_expires_at = time.time() + expires_in
if not self.access_token:
raise ValueError("No access token received from OAuth server")
return self.access_token
except requests.exceptions.RequestException as e:
raise ValueError(f"Failed to get access token: {e}") from e
def get_access_token(self) -> str:
"""Get OAuth 2.0 access token using client credentials flow"""
# Check if we have a valid cached token
if not self._is_token_expired():
return self.access_token
# Token is expired or doesn't exist, get a new one
return self._request_new_token()