-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdiff_app_routers_admin.py
More file actions
119 lines (103 loc) · 4.23 KB
/
diff_app_routers_admin.py
File metadata and controls
119 lines (103 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
diff --git a/app/routers/admin.py b/app/routers/admin.py
index 5a820a32..ba75cd93 100644
--- a/app/routers/admin.py
+++ b/app/routers/admin.py
@@ -6,6 +6,7 @@ from typing import List, Dict, Any
import os
from app.middleware.tenant import get_current_user_role
+from app.middleware.tenant_context import require_admin_role
from app.services.log_utils import verify_log_chain_integrity, generate_audit_report
from app.models import ValidationLog
from app.services.background_jobs import get_background_job_service
@@ -14,15 +15,13 @@ from app.middleware.rate_limit import get_rate_limiter
from sqlalchemy.orm import Session
from datetime import datetime
-router = APIRouter(prefix="/admin", tags=["admin"])
+router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin_role)])
@router.get("/settings")
async def get_settings(request: Request, role: str = Depends(get_current_user_role)):
"""
Get system settings and feature flags (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
return {
"feature_flags": {
@@ -51,8 +50,6 @@ async def list_tenants(request: Request, role: str = Depends(get_current_user_ro
"""
List all tenants (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
# This is a placeholder - implement actual tenant listing
return [
@@ -70,8 +67,6 @@ async def list_api_keys(request: Request, role: str = Depends(get_current_user_r
"""
List all API keys (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
# This is a placeholder - implement actual API key listing
return [
@@ -96,8 +91,6 @@ async def list_validation_logs(
"""
List validation logs (admin only)
"""
- # if role != "admin":
- # raise HTTPException(status_code=403, detail="Admin access required")
import sqlite3
import json
@@ -176,8 +169,6 @@ async def get_validation_log_with_hash(
"""
Get validation log with tamper-evident hash (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
log = db.query(ValidationLog).filter(ValidationLog.id == log_id).first()
if not log:
@@ -206,8 +197,6 @@ async def verify_log_integrity(
"""
Verify integrity of validation log chain (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
return verify_log_chain_integrity(db, limit)
@@ -222,8 +211,6 @@ async def generate_compliance_audit_report(
"""
Generate comprehensive audit report (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
start_dt = datetime.fromisoformat(start_date) if start_date else None
end_dt = datetime.fromisoformat(end_date) if end_date else None
@@ -238,8 +225,6 @@ async def get_rate_limiting_status(
"""
Get rate limiting status and statistics (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
rate_limiter = get_rate_limiter()
@@ -269,8 +254,6 @@ async def get_background_jobs_status(
"""
Get background jobs status (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
bg_service = get_background_job_service()
return bg_service.get_status()
@@ -284,8 +267,6 @@ async def run_api_key_expiry_job(
"""
Manually trigger API key expiry job (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
bg_service = get_background_job_service()
result = await bg_service.run_api_key_expiry_job()
@@ -300,8 +281,6 @@ async def run_usage_reset_jobs(
"""
Manually trigger usage reset jobs (admin only)
"""
- if role != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
bg_service = get_background_job_service()
result = await bg_service.run_usage_reset_jobs()