Security and compliance are fundamental to the LLM Guardian Cluster's design. This comprehensive framework ensures data protection, privacy compliance, and robust security measures across all system components while maintaining the high performance and reliability required for AI workloads.
graph TB
subgraph "External Security Layer"
WAF[Web Application Firewall]
DDoS[DDoS Protection]
CDN[Content Delivery Network]
end
subgraph "Network Security Layer"
LB[Load Balancer + TLS]
VPN[VPN Gateway]
FW[Network Firewall]
IPS[Intrusion Prevention]
end
subgraph "Authentication & Authorization Layer"
OAUTH[OAuth2/OIDC Provider]
RBAC[Role-Based Access Control]
MFA[Multi-Factor Authentication]
JWT[JWT Token Validation]
end
subgraph "Application Security Layer"
API_SEC[API Security Gateway]
INPUT_VAL[Input Validation]
OUTPUT_FILTER[Output Filtering]
RATE_LIMIT[Rate Limiting]
end
subgraph "Data Security Layer"
ENCRYPT[Encryption at Rest]
TLS[TLS in Transit]
TOKENIZATION[Data Tokenization]
ANONYMIZATION[PII Anonymization]
end
subgraph "Infrastructure Security Layer"
CONTAINER_SEC[Container Security]
SECRETS[Secrets Management]
COMPLIANCE[Compliance Engine]
AUDIT[Audit Logging]
end
WAF --> LB
DDoS --> LB
CDN --> WAF
LB --> OAUTH
VPN --> FW
FW --> IPS
OAUTH --> RBAC
RBAC --> MFA
MFA --> JWT
JWT --> API_SEC
API_SEC --> INPUT_VAL
INPUT_VAL --> OUTPUT_FILTER
OUTPUT_FILTER --> RATE_LIMIT
RATE_LIMIT --> ENCRYPT
ENCRYPT --> TLS
TLS --> TOKENIZATION
TOKENIZATION --> ANONYMIZATION
ANONYMIZATION --> CONTAINER_SEC
CONTAINER_SEC --> SECRETS
SECRETS --> COMPLIANCE
COMPLIANCE --> AUDIT
# llm_guardian_cluster/security/auth.py
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
import httpx
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class User:
user_id: str
email: str
roles: List[str]
permissions: List[str]
organization_id: str
security_clearance: str
class AuthenticationManager:
def __init__(self,
issuer: str,
audience: str,
jwks_url: str,
algorithm: str = "RS256"):
self.issuer = issuer
self.audience = audience
self.jwks_url = jwks_url
self.algorithm = algorithm
self.jwks_cache = {}
self.cache_expiry = None
async def get_jwks(self) -> Dict:
"""Fetch and cache JWKS from the OAuth provider"""
if (self.cache_expiry is None or
datetime.utcnow() > self.cache_expiry):
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_url)
response.raise_for_status()
self.jwks_cache = response.json()
self.cache_expiry = datetime.utcnow() + timedelta(hours=1)
return self.jwks_cache
async def verify_token(self, token: str) -> User:
"""Verify JWT token and extract user information"""
try:
# Decode header to get key ID
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
if not kid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token missing key ID"
)
# Get signing key from JWKS
jwks = await self.get_jwks()
signing_key = None
for key in jwks.get("keys", []):
if key.get("kid") == kid:
signing_key = key
break
if not signing_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unable to find appropriate key"
)
# Verify and decode token
payload = jwt.decode(
token,
signing_key,
algorithms=[self.algorithm],
audience=self.audience,
issuer=self.issuer
)
# Extract user information
user = User(
user_id=payload.get("sub"),
email=payload.get("email"),
roles=payload.get("roles", []),
permissions=payload.get("permissions", []),
organization_id=payload.get("org_id"),
security_clearance=payload.get("security_clearance", "public")
)
return user
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token validation failed: {str(e)}"
)
# FastAPI dependency for authentication
security = HTTPBearer()
auth_manager = AuthenticationManager(
issuer="https://auth.llm-guardian-cluster.com",
audience="llm-guardian-api",
jwks_url="https://auth.llm-guardian-cluster.com/.well-known/jwks.json"
)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
"""FastAPI dependency to get current authenticated user"""
return await auth_manager.verify_token(credentials.credentials)# llm_guardian_cluster/security/rbac.py
from enum import Enum
from typing import List, Set
from functools import wraps
from fastapi import HTTPException, status
class Permission(Enum):
# Specialist permissions
SPECIALIST_READ = "specialist:read"
SPECIALIST_EXECUTE = "specialist:execute"
SPECIALIST_CONFIGURE = "specialist:configure"
SPECIALIST_MANAGE = "specialist:manage"
# Guardian permissions
GUARDIAN_READ = "guardian:read"
GUARDIAN_CONFIGURE = "guardian:configure"
GUARDIAN_MANAGE = "guardian:manage"
# System permissions
SYSTEM_READ = "system:read"
SYSTEM_CONFIGURE = "system:configure"
SYSTEM_MANAGE = "system:manage"
SYSTEM_ADMIN = "system:admin"
# Data permissions
DATA_READ = "data:read"
DATA_WRITE = "data:write"
DATA_EXPORT = "data:export"
DATA_DELETE = "data:delete"
# Monitoring permissions
MONITORING_READ = "monitoring:read"
MONITORING_CONFIGURE = "monitoring:configure"
class Role(Enum):
# User roles
VIEWER = "viewer"
OPERATOR = "operator"
DEVELOPER = "developer"
ADMIN = "admin"
SECURITY_OFFICER = "security_officer"
COMPLIANCE_OFFICER = "compliance_officer"
ROLE_PERMISSIONS = {
Role.VIEWER: {
Permission.SPECIALIST_READ,
Permission.GUARDIAN_READ,
Permission.SYSTEM_READ,
Permission.MONITORING_READ
},
Role.OPERATOR: {
Permission.SPECIALIST_READ,
Permission.SPECIALIST_EXECUTE,
Permission.GUARDIAN_READ,
Permission.SYSTEM_READ,
Permission.MONITORING_READ,
Permission.DATA_READ
},
Role.DEVELOPER: {
Permission.SPECIALIST_READ,
Permission.SPECIALIST_EXECUTE,
Permission.SPECIALIST_CONFIGURE,
Permission.GUARDIAN_READ,
Permission.GUARDIAN_CONFIGURE,
Permission.SYSTEM_READ,
Permission.MONITORING_READ,
Permission.MONITORING_CONFIGURE,
Permission.DATA_READ,
Permission.DATA_WRITE
},
Role.ADMIN: {
Permission.SPECIALIST_READ,
Permission.SPECIALIST_EXECUTE,
Permission.SPECIALIST_CONFIGURE,
Permission.SPECIALIST_MANAGE,
Permission.GUARDIAN_READ,
Permission.GUARDIAN_CONFIGURE,
Permission.GUARDIAN_MANAGE,
Permission.SYSTEM_READ,
Permission.SYSTEM_CONFIGURE,
Permission.SYSTEM_MANAGE,
Permission.MONITORING_READ,
Permission.MONITORING_CONFIGURE,
Permission.DATA_READ,
Permission.DATA_WRITE,
Permission.DATA_EXPORT
},
Role.SECURITY_OFFICER: {
Permission.SYSTEM_READ,
Permission.SYSTEM_ADMIN,
Permission.MONITORING_READ,
Permission.MONITORING_CONFIGURE,
Permission.DATA_READ,
Permission.DATA_EXPORT
},
Role.COMPLIANCE_OFFICER: {
Permission.SYSTEM_READ,
Permission.MONITORING_READ,
Permission.DATA_READ,
Permission.DATA_EXPORT
}
}
class AccessControl:
@staticmethod
def get_user_permissions(user: User) -> Set[Permission]:
"""Get all permissions for a user based on their roles"""
permissions = set()
for role_name in user.roles:
try:
role = Role(role_name)
permissions.update(ROLE_PERMISSIONS.get(role, set()))
except ValueError:
# Handle unknown roles gracefully
continue
return permissions
@staticmethod
def has_permission(user: User, required_permission: Permission) -> bool:
"""Check if user has a specific permission"""
user_permissions = AccessControl.get_user_permissions(user)
return required_permission in user_permissions
@staticmethod
def require_permission(required_permission: Permission):
"""Decorator to require specific permission for endpoint access"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract user from function arguments
user = None
for arg in args:
if isinstance(arg, User):
user = arg
break
if not user:
for value in kwargs.values():
if isinstance(value, User):
user = value
break
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
if not AccessControl.has_permission(user, required_permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission {required_permission.value} required"
)
return await func(*args, **kwargs)
return wrapper
return decorator# llm_guardian_cluster/security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64
import os
from typing import bytes, str, Optional
class EncryptionManager:
def __init__(self, master_key: Optional[bytes] = None):
if master_key is None:
master_key = self._generate_master_key()
self.master_key = master_key
self.fernet = Fernet(master_key)
# Generate RSA key pair for asymmetric encryption
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key = self.private_key.public_key()
@staticmethod
def _generate_master_key() -> bytes:
"""Generate a new master key for symmetric encryption"""
return Fernet.generate_key()
def encrypt_symmetric(self, data: str) -> str:
"""Encrypt data using symmetric encryption"""
encrypted_data = self.fernet.encrypt(data.encode())
return base64.b64encode(encrypted_data).decode()
def decrypt_symmetric(self, encrypted_data: str) -> str:
"""Decrypt data using symmetric encryption"""
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(encrypted_bytes)
return decrypted_data.decode()
def encrypt_asymmetric(self, data: str, public_key: Optional[bytes] = None) -> str:
"""Encrypt data using asymmetric encryption"""
if public_key is None:
pub_key = self.public_key
else:
pub_key = serialization.load_pem_public_key(public_key)
encrypted_data = pub_key.encrypt(
data.encode(),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return base64.b64encode(encrypted_data).decode()
def decrypt_asymmetric(self, encrypted_data: str) -> str:
"""Decrypt data using asymmetric encryption"""
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted_data = self.private_key.decrypt(
encrypted_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted_data.decode()
def get_public_key_pem(self) -> str:
"""Get public key in PEM format"""
pem = self.public_key.public_serialization(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode()
# Database field encryption
class FieldEncryption:
def __init__(self, encryption_manager: EncryptionManager):
self.encryption_manager = encryption_manager
def encrypt_pii_field(self, value: str) -> str:
"""Encrypt PII fields in database"""
if not value:
return value
return self.encryption_manager.encrypt_symmetric(value)
def decrypt_pii_field(self, encrypted_value: str) -> str:
"""Decrypt PII fields from database"""
if not encrypted_value:
return encrypted_value
return self.encryption_manager.decrypt_symmetric(encrypted_value)# llm_guardian_cluster/security/pii_protection.py
import re
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import RecognizerResult, OperatorConfig
@dataclass
class PIIDetectionResult:
entity_type: str
start: int
end: int
confidence: float
text: str
class PIIProtector:
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
# Custom PII patterns
self.custom_patterns = {
'API_KEY': re.compile(r'[A-Za-z0-9]{32,}'),
'IP_ADDRESS': re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
'MODEL_NAME': re.compile(r'(?:gpt-|claude-|llama-)[A-Za-z0-9\-]+'),
}
def detect_pii(self, text: str, language: str = 'en') -> List[PIIDetectionResult]:
"""Detect PII in text using Presidio and custom patterns"""
results = []
# Use Presidio for standard PII detection
presidio_results = self.analyzer.analyze(
text=text,
language=language,
entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD",
"US_SSN", "US_DRIVER_LICENSE", "US_PASSPORT", "IBAN_CODE",
"IP_ADDRESS", "URL"]
)
for result in presidio_results:
results.append(PIIDetectionResult(
entity_type=result.entity_type,
start=result.start,
end=result.end,
confidence=result.score,
text=text[result.start:result.end]
))
# Add custom pattern detection
for pattern_name, pattern in self.custom_patterns.items():
for match in pattern.finditer(text):
results.append(PIIDetectionResult(
entity_type=pattern_name,
start=match.start(),
end=match.end(),
confidence=0.9,
text=match.group()
))
return results
def anonymize_pii(self, text: str, anonymization_config: Optional[Dict] = None) -> str:
"""Anonymize PII in text"""
if anonymization_config is None:
anonymization_config = {
"PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
"CREDIT_CARD": OperatorConfig("replace", {"new_value": "[CREDIT_CARD]"}),
"US_SSN": OperatorConfig("replace", {"new_value": "[SSN]"}),
"IP_ADDRESS": OperatorConfig("replace", {"new_value": "[IP_ADDRESS]"}),
"API_KEY": OperatorConfig("replace", {"new_value": "[API_KEY]"}),
}
# Detect PII first
pii_results = self.detect_pii(text)
# Convert to Presidio format
presidio_results = [
RecognizerResult(
entity_type=result.entity_type,
start=result.start,
end=result.end,
score=result.confidence
)
for result in pii_results
]
# Anonymize
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=presidio_results,
operators=anonymization_config
)
return anonymized_result.text
def pseudonymize_pii(self, text: str) -> Tuple[str, Dict[str, str]]:
"""Pseudonymize PII while maintaining referential integrity"""
pii_results = self.detect_pii(text)
pseudonym_map = {}
pseudonymized_text = text
# Sort by start position in reverse order to maintain indices
pii_results.sort(key=lambda x: x.start, reverse=True)
entity_counters = {}
for result in pii_results:
entity_type = result.entity_type
original_text = result.text
# Generate consistent pseudonym
if original_text not in pseudonym_map:
if entity_type not in entity_counters:
entity_counters[entity_type] = 0
entity_counters[entity_type] += 1
pseudonym = f"[{entity_type}_{entity_counters[entity_type]}]"
pseudonym_map[original_text] = pseudonym
# Replace in text
pseudonymized_text = (
pseudonymized_text[:result.start] +
pseudonym_map[original_text] +
pseudonymized_text[result.end:]
)
return pseudonymized_text, pseudonym_map# llm_guardian_cluster/compliance/gdpr.py
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class DataProcessingPurpose(Enum):
TRAINING = "training"
INFERENCE = "inference"
MONITORING = "monitoring"
ANALYTICS = "analytics"
IMPROVEMENT = "improvement"
class LegalBasis(Enum):
CONSENT = "consent"
CONTRACT = "contract"
LEGAL_OBLIGATION = "legal_obligation"
VITAL_INTERESTS = "vital_interests"
PUBLIC_TASK = "public_task"
LEGITIMATE_INTERESTS = "legitimate_interests"
@dataclass
class DataProcessingRecord:
id: str
data_subject_id: str
purpose: DataProcessingPurpose
legal_basis: LegalBasis
data_categories: List[str]
retention_period: timedelta
processing_start: datetime
consent_given: bool
consent_timestamp: Optional[datetime]
data_source: str
third_party_sharing: bool
automated_decision_making: bool
class GDPRComplianceManager:
def __init__(self):
self.processing_records: Dict[str, DataProcessingRecord] = {}
self.retention_policies = {
DataProcessingPurpose.TRAINING: timedelta(days=2555), # 7 years
DataProcessingPurpose.INFERENCE: timedelta(days=30),
DataProcessingPurpose.MONITORING: timedelta(days=365),
DataProcessingPurpose.ANALYTICS: timedelta(days=365),
DataProcessingPurpose.IMPROVEMENT: timedelta(days=1095) # 3 years
}
def record_data_processing(self, record: DataProcessingRecord) -> str:
"""Record data processing activity for GDPR compliance"""
self.processing_records[record.id] = record
return record.id
def check_retention_compliance(self) -> List[str]:
"""Check for data that should be deleted based on retention policies"""
expired_records = []
current_time = datetime.utcnow()
for record_id, record in self.processing_records.items():
retention_period = self.retention_policies.get(record.purpose, timedelta(days=365))
expiry_time = record.processing_start + retention_period
if current_time > expiry_time:
expired_records.append(record_id)
return expired_records
def handle_data_subject_request(self, request_type: str, data_subject_id: str) -> Dict:
"""Handle GDPR data subject requests"""
subject_records = [
record for record in self.processing_records.values()
if record.data_subject_id == data_subject_id
]
if request_type == "access":
return self._handle_access_request(subject_records)
elif request_type == "rectification":
return self._handle_rectification_request(subject_records)
elif request_type == "erasure":
return self._handle_erasure_request(subject_records)
elif request_type == "portability":
return self._handle_portability_request(subject_records)
elif request_type == "objection":
return self._handle_objection_request(subject_records)
else:
raise ValueError(f"Unknown request type: {request_type}")
def _handle_access_request(self, records: List[DataProcessingRecord]) -> Dict:
"""Handle data subject access request"""
return {
"request_type": "access",
"processing_activities": [
{
"purpose": record.purpose.value,
"legal_basis": record.legal_basis.value,
"data_categories": record.data_categories,
"retention_period": record.retention_period.days,
"processing_start": record.processing_start.isoformat(),
"third_party_sharing": record.third_party_sharing,
"automated_decision_making": record.automated_decision_making
}
for record in records
],
"data_sources": list(set(record.data_source for record in records)),
"response_timestamp": datetime.utcnow().isoformat()
}
def _handle_erasure_request(self, records: List[DataProcessingRecord]) -> Dict:
"""Handle right to be forgotten request"""
erasable_records = []
non_erasable_records = []
for record in records:
# Check if erasure is legally required or permitted
if self._can_erase_data(record):
erasable_records.append(record.id)
else:
non_erasable_records.append({
"record_id": record.id,
"reason": self._get_non_erasure_reason(record)
})
return {
"request_type": "erasure",
"erasable_records": erasable_records,
"non_erasable_records": non_erasable_records,
"response_timestamp": datetime.utcnow().isoformat()
}
def _can_erase_data(self, record: DataProcessingRecord) -> bool:
"""Determine if data can be erased under GDPR"""
# Data can be erased if:
# 1. Consent was the legal basis and is withdrawn
# 2. Data is no longer necessary for the original purpose
# 3. Processing was unlawful
# 4. Erasure is required for legal compliance
if record.legal_basis == LegalBasis.CONSENT and not record.consent_given:
return True
# Check if retention period has expired
retention_period = self.retention_policies.get(record.purpose, timedelta(days=365))
if datetime.utcnow() > record.processing_start + retention_period:
return True
return False
def _get_non_erasure_reason(self, record: DataProcessingRecord) -> str:
"""Get reason why data cannot be erased"""
if record.legal_basis == LegalBasis.LEGAL_OBLIGATION:
return "Data retention required by legal obligation"
elif record.legal_basis == LegalBasis.LEGITIMATE_INTERESTS:
return "Legitimate interests override erasure request"
elif record.purpose == DataProcessingPurpose.TRAINING:
return "Data required for model integrity and performance"
else:
return "Erasure not applicable under current legal basis"# llm_guardian_cluster/compliance/audit.py
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import hashlib
class AuditEventType(Enum):
DATA_ACCESS = "data_access"
DATA_MODIFICATION = "data_modification"
DATA_DELETION = "data_deletion"
USER_AUTHENTICATION = "user_authentication"
PERMISSION_CHANGE = "permission_change"
SYSTEM_CONFIGURATION = "system_configuration"
MODEL_TRAINING = "model_training"
MODEL_INFERENCE = "model_inference"
SECURITY_INCIDENT = "security_incident"
COMPLIANCE_CHECK = "compliance_check"
@dataclass
class AuditEvent:
event_id: str
timestamp: datetime
event_type: AuditEventType
user_id: Optional[str]
resource_id: str
action: str
result: str
ip_address: Optional[str]
user_agent: Optional[str]
additional_data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat(),
"event_type": self.event_type.value,
"user_id": self.user_id,
"resource_id": self.resource_id,
"action": self.action,
"result": self.result,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
"additional_data": self.additional_data
}
def calculate_integrity_hash(self) -> str:
"""Calculate hash for audit trail integrity"""
event_data = json.dumps(self.to_dict(), sort_keys=True)
return hashlib.sha256(event_data.encode()).hexdigest()
class AuditLogger:
def __init__(self, storage_backend):
self.storage = storage_backend
self.previous_hash = None
async def log_event(self, event: AuditEvent) -> str:
"""Log audit event with integrity protection"""
# Calculate integrity hash
event_hash = event.calculate_integrity_hash()
# Chain with previous event for integrity
if self.previous_hash:
chained_data = f"{self.previous_hash}{event_hash}"
chain_hash = hashlib.sha256(chained_data.encode()).hexdigest()
else:
chain_hash = event_hash
# Store event with integrity information
audit_record = {
**event.to_dict(),
"event_hash": event_hash,
"chain_hash": chain_hash,
"previous_hash": self.previous_hash
}
await self.storage.store_audit_record(audit_record)
self.previous_hash = chain_hash
return event.event_id
async def verify_audit_trail_integrity(self, start_date: datetime,
end_date: datetime) -> bool:
"""Verify integrity of audit trail for a given period"""
records = await self.storage.get_audit_records(start_date, end_date)
if not records:
return True
# Sort by timestamp
records.sort(key=lambda r: r["timestamp"])
previous_hash = None
for record in records:
# Recalculate event hash
event_dict = {k: v for k, v in record.items()
if k not in ["event_hash", "chain_hash", "previous_hash"]}
calculated_hash = hashlib.sha256(
json.dumps(event_dict, sort_keys=True).encode()
).hexdigest()
if calculated_hash != record["event_hash"]:
return False
# Verify chain integrity
if previous_hash != record["previous_hash"]:
return False
# Verify chain hash
if previous_hash:
chained_data = f"{previous_hash}{calculated_hash}"
expected_chain_hash = hashlib.sha256(chained_data.encode()).hexdigest()
else:
expected_chain_hash = calculated_hash
if expected_chain_hash != record["chain_hash"]:
return False
previous_hash = record["chain_hash"]
return True# k8s/security/pod-security-policy.yaml
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: llm-guardian-psp
spec:
privileged: false
allowPrivilegeEscalation: false
requiredDropCapabilities:
- ALL
volumes:
- "configMap"
- "emptyDir"
- "projected"
- "secret"
- "downwardAPI"
- "persistentVolumeClaim"
runAsUser:
rule: "MustRunAsNonRoot"
runAsGroup:
rule: "MustRunAs"
ranges:
- min: 1000
max: 65535
seLinux:
rule: "RunAsAny"
supplementalGroups:
rule: "MustRunAs"
ranges:
- min: 1000
max: 65535
fsGroup:
rule: "RunAsAny"
readOnlyRootFilesystem: true
---
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: llm-guardian-gpu-psp
spec:
privileged: false
allowPrivilegeEscalation: false
allowedCapabilities:
- SYS_ADMIN # Required for NVIDIA GPU access
volumes:
- "configMap"
- "emptyDir"
- "projected"
- "secret"
- "downwardAPI"
- "persistentVolumeClaim"
- "hostPath" # Required for GPU device access
allowedHostPaths:
- pathPrefix: "/dev/nvidia"
readOnly: false
- pathPrefix: "/usr/local/nvidia"
readOnly: true
runAsUser:
rule: "MustRunAsNonRoot"
runAsGroup:
rule: "MustRunAs"
ranges:
- min: 1000
max: 65535
seLinux:
rule: "RunAsAny"
supplementalGroups:
rule: "MustRunAs"
ranges:
- min: 1000
max: 65535
fsGroup:
rule: "RunAsAny"# k8s/security/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: llm-guardian-network-policy
namespace: llm-guardian
spec:
podSelector:
matchLabels:
app: llm-guardian
policyTypes:
- Ingress
- Egress
ingress:
# Allow ingress from API gateway
- from:
- podSelector:
matchLabels:
app: api-gateway
ports:
- protocol: TCP
port: 8080
# Allow ingress from monitoring
- from:
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- protocol: TCP
port: 9090
egress:
# Allow egress to databases
- to:
- podSelector:
matchLabels:
app: postgresql
ports:
- protocol: TCP
port: 5432
# Allow egress to vector database
- to:
- podSelector:
matchLabels:
app: chromadb
ports:
- protocol: TCP
port: 8000
# Allow egress to message queue
- to:
- podSelector:
matchLabels:
app: kafka
ports:
- protocol: TCP
port: 9092
# Allow DNS resolution
- to: []
ports:
- protocol: UDP
port: 53
- protocol: TCP
port: 53
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: deny-all-default
namespace: llm-guardian
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress# k8s/security/external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: SecretStore
metadata:
name: vault-secret-store
namespace: llm-guardian
spec:
provider:
vault:
server: "https://vault.company.com"
path: "secret"
version: "v2"
auth:
kubernetes:
mountPath: "kubernetes"
role: "llm-guardian-role"
serviceAccountRef:
name: "external-secrets-sa"
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: database-credentials
namespace: llm-guardian
spec:
refreshInterval: 15s
secretStoreRef:
name: vault-secret-store
kind: SecretStore
target:
name: database-secret
creationPolicy: Owner
data:
- secretKey: username
remoteRef:
key: database/postgresql
property: username
- secretKey: password
remoteRef:
key: database/postgresql
property: password
- secretKey: host
remoteRef:
key: database/postgresql
property: host
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: api-keys
namespace: llm-guardian
spec:
refreshInterval: 15s
secretStoreRef:
name: vault-secret-store
kind: SecretStore
target:
name: api-keys-secret
creationPolicy: Owner
data:
- secretKey: openai_api_key
remoteRef:
key: api-keys/external
property: openai_api_key
- secretKey: encryption_key
remoteRef:
key: encryption/master
property: keyNext: Performance Optimization