-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
295 lines (238 loc) · 9.75 KB
/
auth.py
File metadata and controls
295 lines (238 loc) · 9.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Firebase Authentication Module
This module provides Firebase token verification and authentication decorators
for protecting Flask routes. It uses Firebase Admin SDK for server-side token
verification.
Usage:
from auth import require_auth, require_payment
@app.route("/protected")
@require_auth
def protected_route():
user = g.current_user # Contains firebase_uid, email, etc.
return jsonify({"user": user})
@app.route("/paid-only")
@require_auth
@require_payment
def paid_route():
# Only accessible to users who have paid
return jsonify({"message": "Premium content"})
"""
import os
import functools
from flask import request, jsonify, g
from typing import Optional, Dict, Any
# Firebase Admin SDK
import firebase_admin
from firebase_admin import credentials, auth as firebase_auth
from db import get_user_by_firebase_uid, create_or_update_user, check_user_payment_status
# Initialize Firebase Admin SDK
_firebase_app = None
def _init_firebase():
"""Initialize Firebase Admin SDK. Call once at app startup."""
global _firebase_app
if _firebase_app is not None:
return _firebase_app
# Option 1: Use service account JSON file
service_account_path = os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH")
if service_account_path and os.path.exists(service_account_path):
cred = credentials.Certificate(service_account_path)
_firebase_app = firebase_admin.initialize_app(cred)
return _firebase_app
# Option 2: Use environment variable with JSON content
service_account_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON")
if service_account_json:
import json
cred_dict = json.loads(service_account_json)
cred = credentials.Certificate(cred_dict)
_firebase_app = firebase_admin.initialize_app(cred)
return _firebase_app
# Option 3: Use Application Default Credentials (for GCP environments)
# Or just project ID for development
project_id = os.getenv("FIREBASE_PROJECT_ID")
if project_id:
_firebase_app = firebase_admin.initialize_app(options={"projectId": project_id})
return _firebase_app
raise ValueError(
"Firebase credentials not configured. Set one of: "
"FIREBASE_SERVICE_ACCOUNT_PATH, FIREBASE_SERVICE_ACCOUNT_JSON, or FIREBASE_PROJECT_ID"
)
def verify_firebase_token(id_token: str) -> Optional[Dict[str, Any]]:
"""
Verify a Firebase ID token and return decoded token data.
Args:
id_token: The Firebase ID token from the client
Returns:
Decoded token containing uid, email, name, etc. or None if invalid
"""
try:
_init_firebase()
decoded_token = firebase_auth.verify_id_token(id_token)
return {
"uid": decoded_token.get("uid"),
"email": decoded_token.get("email"),
"name": decoded_token.get("name"),
"picture": decoded_token.get("picture"),
"email_verified": decoded_token.get("email_verified", False),
"auth_time": decoded_token.get("auth_time"),
}
except firebase_admin.exceptions.FirebaseError as e:
print(f"Firebase token verification failed: {e}")
return None
except Exception as e:
print(f"Token verification error: {e}")
return None
def get_token_from_request() -> Optional[str]:
"""Extract Bearer token from Authorization header."""
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:] # Remove "Bearer " prefix
return None
def require_auth(f):
"""
Decorator that requires a valid Firebase ID token.
Sets g.current_user with user info from database (or creates new user).
Returns 401 if token is missing or invalid.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
token = get_token_from_request()
if not token:
return jsonify({
"error": "Authorization required",
"message": "Missing Bearer token in Authorization header"
}), 401
decoded = verify_firebase_token(token)
if not decoded:
return jsonify({
"error": "Invalid token",
"message": "Firebase ID token is invalid or expired"
}), 401
# Get or create user in database
firebase_uid = decoded["uid"]
user = get_user_by_firebase_uid(firebase_uid)
if not user:
# First-time user - create in database
create_or_update_user(
firebase_uid=firebase_uid,
email=decoded.get("email"),
display_name=decoded.get("name")
)
user = get_user_by_firebase_uid(firebase_uid)
# Store user in Flask's g context for use in route handlers
g.current_user = user
g.firebase_token = decoded
return f(*args, **kwargs)
return decorated_function
def require_payment(f):
"""
Decorator that requires the user to have report credits available.
Must be used AFTER @require_auth decorator.
Returns 402 Payment Required if user has no credits.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# Ensure require_auth was called first
if not hasattr(g, 'current_user') or g.current_user is None:
return jsonify({
"error": "Authentication required",
"message": "This endpoint requires authentication"
}), 401
user = g.current_user
remaining_credits = user.get("report_credits", 0)
if remaining_credits <= 0:
return jsonify({
"error": "Payment required",
"message": "You have no report credits remaining. Please purchase more credits to continue.",
"payment_status": "no_credits",
"remaining_credits": 0
}), 402
return f(*args, **kwargs)
return decorated_function
def consume_credit(f):
"""
Decorator that atomically consumes one report credit after successful execution.
Must be used AFTER @require_auth and @require_payment decorators.
This decorator:
1. Attempts to atomically consume one credit before calling the wrapped function
2. If credit consumption fails (race condition), returns 402
3. On success, includes remaining_credits in the response
Note: Credit is consumed BEFORE report generation to prevent generating
reports without valid credits in race conditions.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
from db import consume_user_credit, get_user_credits
# Ensure require_auth was called first
if not hasattr(g, 'current_user') or g.current_user is None:
return jsonify({
"error": "Authentication required",
"message": "This endpoint requires authentication"
}), 401
user = g.current_user
firebase_uid = user.get("firebase_uid")
# Atomically consume one credit
if not consume_user_credit(firebase_uid):
return jsonify({
"error": "Payment required",
"message": "Unable to consume credit. You may have no credits remaining.",
"payment_status": "no_credits",
"remaining_credits": 0
}), 402
# Store remaining credits for use in response (after consumption)
g.remaining_credits = get_user_credits(firebase_uid)
return f(*args, **kwargs)
return decorated_function
def optional_auth(f):
"""
Decorator that optionally processes authentication if present.
Unlike require_auth, this doesn't return 401 for missing/invalid tokens.
Sets g.current_user to None if no valid auth is present.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
g.current_user = None
g.firebase_token = None
token = get_token_from_request()
if token:
decoded = verify_firebase_token(token)
if decoded:
firebase_uid = decoded["uid"]
user = get_user_by_firebase_uid(firebase_uid)
if user:
g.current_user = user
g.firebase_token = decoded
return f(*args, **kwargs)
return decorated_function
def require_admin(f):
"""
Decorator that requires the authenticated user to be an admin.
Must be used AFTER @require_auth decorator.
Checks g.firebase_token["email"] against the ADMIN_EMAILS environment variable
(comma-separated list of admin email addresses).
Returns 403 Forbidden if not an admin.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
# Ensure require_auth was called first
if not hasattr(g, 'firebase_token') or g.firebase_token is None:
return jsonify({
"error": "Authentication required",
"message": "This endpoint requires authentication"
}), 401
user_email = (g.firebase_token.get("email") or "").lower().strip()
admin_emails_raw = os.getenv("ADMIN_EMAILS", "")
admin_emails = [
e.strip().lower() for e in admin_emails_raw.split(",") if e.strip()
]
if not admin_emails:
return jsonify({
"error": "Admin not configured",
"message": "No admin emails have been configured on the server"
}), 403
if user_email not in admin_emails:
return jsonify({
"error": "Forbidden",
"message": "You do not have admin access"
}), 403
return f(*args, **kwargs)
return decorated_function