Skip to content

Latest commit

 

History

History
1121 lines (977 loc) · 33.8 KB

File metadata and controls

1121 lines (977 loc) · 33.8 KB

Security & Compliance

Overview

Security and compliance are fundamental to the LLM Guardian Cluster's design. This comprehensive framework ensures data protection, privacy compliance, and robust security measures across all system components while maintaining the high performance and reliability required for AI workloads.

Security Architecture

graph TB
    subgraph "External Security Layer"
        WAF[Web Application Firewall]
        DDoS[DDoS Protection]
        CDN[Content Delivery Network]
    end

    subgraph "Network Security Layer"
        LB[Load Balancer + TLS]
        VPN[VPN Gateway]
        FW[Network Firewall]
        IPS[Intrusion Prevention]
    end

    subgraph "Authentication & Authorization Layer"
        OAUTH[OAuth2/OIDC Provider]
        RBAC[Role-Based Access Control]
        MFA[Multi-Factor Authentication]
        JWT[JWT Token Validation]
    end

    subgraph "Application Security Layer"
        API_SEC[API Security Gateway]
        INPUT_VAL[Input Validation]
        OUTPUT_FILTER[Output Filtering]
        RATE_LIMIT[Rate Limiting]
    end

    subgraph "Data Security Layer"
        ENCRYPT[Encryption at Rest]
        TLS[TLS in Transit]
        TOKENIZATION[Data Tokenization]
        ANONYMIZATION[PII Anonymization]
    end

    subgraph "Infrastructure Security Layer"
        CONTAINER_SEC[Container Security]
        SECRETS[Secrets Management]
        COMPLIANCE[Compliance Engine]
        AUDIT[Audit Logging]
    end

    WAF --> LB
    DDoS --> LB
    CDN --> WAF

    LB --> OAUTH
    VPN --> FW
    FW --> IPS

    OAUTH --> RBAC
    RBAC --> MFA
    MFA --> JWT

    JWT --> API_SEC
    API_SEC --> INPUT_VAL
    INPUT_VAL --> OUTPUT_FILTER
    OUTPUT_FILTER --> RATE_LIMIT

    RATE_LIMIT --> ENCRYPT
    ENCRYPT --> TLS
    TLS --> TOKENIZATION
    TOKENIZATION --> ANONYMIZATION

    ANONYMIZATION --> CONTAINER_SEC
    CONTAINER_SEC --> SECRETS
    SECRETS --> COMPLIANCE
    COMPLIANCE --> AUDIT
Loading

Authentication & Authorization

OAuth2/OIDC Implementation

# llm_guardian_cluster/security/auth.py
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
import httpx
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class User:
    user_id: str
    email: str
    roles: List[str]
    permissions: List[str]
    organization_id: str
    security_clearance: str

class AuthenticationManager:
    def __init__(self,
                 issuer: str,
                 audience: str,
                 jwks_url: str,
                 algorithm: str = "RS256"):
        self.issuer = issuer
        self.audience = audience
        self.jwks_url = jwks_url
        self.algorithm = algorithm
        self.jwks_cache = {}
        self.cache_expiry = None

    async def get_jwks(self) -> Dict:
        """Fetch and cache JWKS from the OAuth provider"""
        if (self.cache_expiry is None or
            datetime.utcnow() > self.cache_expiry):

            async with httpx.AsyncClient() as client:
                response = await client.get(self.jwks_url)
                response.raise_for_status()

            self.jwks_cache = response.json()
            self.cache_expiry = datetime.utcnow() + timedelta(hours=1)

        return self.jwks_cache

    async def verify_token(self, token: str) -> User:
        """Verify JWT token and extract user information"""
        try:
            # Decode header to get key ID
            unverified_header = jwt.get_unverified_header(token)
            kid = unverified_header.get("kid")

            if not kid:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Token missing key ID"
                )

            # Get signing key from JWKS
            jwks = await self.get_jwks()
            signing_key = None

            for key in jwks.get("keys", []):
                if key.get("kid") == kid:
                    signing_key = key
                    break

            if not signing_key:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Unable to find appropriate key"
                )

            # Verify and decode token
            payload = jwt.decode(
                token,
                signing_key,
                algorithms=[self.algorithm],
                audience=self.audience,
                issuer=self.issuer
            )

            # Extract user information
            user = User(
                user_id=payload.get("sub"),
                email=payload.get("email"),
                roles=payload.get("roles", []),
                permissions=payload.get("permissions", []),
                organization_id=payload.get("org_id"),
                security_clearance=payload.get("security_clearance", "public")
            )

            return user

        except JWTError as e:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=f"Token validation failed: {str(e)}"
            )

# FastAPI dependency for authentication
security = HTTPBearer()
auth_manager = AuthenticationManager(
    issuer="https://auth.llm-guardian-cluster.com",
    audience="llm-guardian-api",
    jwks_url="https://auth.llm-guardian-cluster.com/.well-known/jwks.json"
)

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
    """FastAPI dependency to get current authenticated user"""
    return await auth_manager.verify_token(credentials.credentials)

Role-Based Access Control (RBAC)

# llm_guardian_cluster/security/rbac.py
from enum import Enum
from typing import List, Set
from functools import wraps
from fastapi import HTTPException, status

class Permission(Enum):
    # Specialist permissions
    SPECIALIST_READ = "specialist:read"
    SPECIALIST_EXECUTE = "specialist:execute"
    SPECIALIST_CONFIGURE = "specialist:configure"
    SPECIALIST_MANAGE = "specialist:manage"

    # Guardian permissions
    GUARDIAN_READ = "guardian:read"
    GUARDIAN_CONFIGURE = "guardian:configure"
    GUARDIAN_MANAGE = "guardian:manage"

    # System permissions
    SYSTEM_READ = "system:read"
    SYSTEM_CONFIGURE = "system:configure"
    SYSTEM_MANAGE = "system:manage"
    SYSTEM_ADMIN = "system:admin"

    # Data permissions
    DATA_READ = "data:read"
    DATA_WRITE = "data:write"
    DATA_EXPORT = "data:export"
    DATA_DELETE = "data:delete"

    # Monitoring permissions
    MONITORING_READ = "monitoring:read"
    MONITORING_CONFIGURE = "monitoring:configure"

class Role(Enum):
    # User roles
    VIEWER = "viewer"
    OPERATOR = "operator"
    DEVELOPER = "developer"
    ADMIN = "admin"
    SECURITY_OFFICER = "security_officer"
    COMPLIANCE_OFFICER = "compliance_officer"

ROLE_PERMISSIONS = {
    Role.VIEWER: {
        Permission.SPECIALIST_READ,
        Permission.GUARDIAN_READ,
        Permission.SYSTEM_READ,
        Permission.MONITORING_READ
    },
    Role.OPERATOR: {
        Permission.SPECIALIST_READ,
        Permission.SPECIALIST_EXECUTE,
        Permission.GUARDIAN_READ,
        Permission.SYSTEM_READ,
        Permission.MONITORING_READ,
        Permission.DATA_READ
    },
    Role.DEVELOPER: {
        Permission.SPECIALIST_READ,
        Permission.SPECIALIST_EXECUTE,
        Permission.SPECIALIST_CONFIGURE,
        Permission.GUARDIAN_READ,
        Permission.GUARDIAN_CONFIGURE,
        Permission.SYSTEM_READ,
        Permission.MONITORING_READ,
        Permission.MONITORING_CONFIGURE,
        Permission.DATA_READ,
        Permission.DATA_WRITE
    },
    Role.ADMIN: {
        Permission.SPECIALIST_READ,
        Permission.SPECIALIST_EXECUTE,
        Permission.SPECIALIST_CONFIGURE,
        Permission.SPECIALIST_MANAGE,
        Permission.GUARDIAN_READ,
        Permission.GUARDIAN_CONFIGURE,
        Permission.GUARDIAN_MANAGE,
        Permission.SYSTEM_READ,
        Permission.SYSTEM_CONFIGURE,
        Permission.SYSTEM_MANAGE,
        Permission.MONITORING_READ,
        Permission.MONITORING_CONFIGURE,
        Permission.DATA_READ,
        Permission.DATA_WRITE,
        Permission.DATA_EXPORT
    },
    Role.SECURITY_OFFICER: {
        Permission.SYSTEM_READ,
        Permission.SYSTEM_ADMIN,
        Permission.MONITORING_READ,
        Permission.MONITORING_CONFIGURE,
        Permission.DATA_READ,
        Permission.DATA_EXPORT
    },
    Role.COMPLIANCE_OFFICER: {
        Permission.SYSTEM_READ,
        Permission.MONITORING_READ,
        Permission.DATA_READ,
        Permission.DATA_EXPORT
    }
}

class AccessControl:
    @staticmethod
    def get_user_permissions(user: User) -> Set[Permission]:
        """Get all permissions for a user based on their roles"""
        permissions = set()
        for role_name in user.roles:
            try:
                role = Role(role_name)
                permissions.update(ROLE_PERMISSIONS.get(role, set()))
            except ValueError:
                # Handle unknown roles gracefully
                continue
        return permissions

    @staticmethod
    def has_permission(user: User, required_permission: Permission) -> bool:
        """Check if user has a specific permission"""
        user_permissions = AccessControl.get_user_permissions(user)
        return required_permission in user_permissions

    @staticmethod
    def require_permission(required_permission: Permission):
        """Decorator to require specific permission for endpoint access"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # Extract user from function arguments
                user = None
                for arg in args:
                    if isinstance(arg, User):
                        user = arg
                        break

                if not user:
                    for value in kwargs.values():
                        if isinstance(value, User):
                            user = value
                            break

                if not user:
                    raise HTTPException(
                        status_code=status.HTTP_401_UNAUTHORIZED,
                        detail="Authentication required"
                    )

                if not AccessControl.has_permission(user, required_permission):
                    raise HTTPException(
                        status_code=status.HTTP_403_FORBIDDEN,
                        detail=f"Permission {required_permission.value} required"
                    )

                return await func(*args, **kwargs)
            return wrapper
        return decorator

Data Security

Encryption Implementation

# llm_guardian_cluster/security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64
import os
from typing import bytes, str, Optional

class EncryptionManager:
    def __init__(self, master_key: Optional[bytes] = None):
        if master_key is None:
            master_key = self._generate_master_key()

        self.master_key = master_key
        self.fernet = Fernet(master_key)

        # Generate RSA key pair for asymmetric encryption
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()

    @staticmethod
    def _generate_master_key() -> bytes:
        """Generate a new master key for symmetric encryption"""
        return Fernet.generate_key()

    def encrypt_symmetric(self, data: str) -> str:
        """Encrypt data using symmetric encryption"""
        encrypted_data = self.fernet.encrypt(data.encode())
        return base64.b64encode(encrypted_data).decode()

    def decrypt_symmetric(self, encrypted_data: str) -> str:
        """Decrypt data using symmetric encryption"""
        encrypted_bytes = base64.b64decode(encrypted_data.encode())
        decrypted_data = self.fernet.decrypt(encrypted_bytes)
        return decrypted_data.decode()

    def encrypt_asymmetric(self, data: str, public_key: Optional[bytes] = None) -> str:
        """Encrypt data using asymmetric encryption"""
        if public_key is None:
            pub_key = self.public_key
        else:
            pub_key = serialization.load_pem_public_key(public_key)

        encrypted_data = pub_key.encrypt(
            data.encode(),
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return base64.b64encode(encrypted_data).decode()

    def decrypt_asymmetric(self, encrypted_data: str) -> str:
        """Decrypt data using asymmetric encryption"""
        encrypted_bytes = base64.b64decode(encrypted_data.encode())
        decrypted_data = self.private_key.decrypt(
            encrypted_bytes,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return decrypted_data.decode()

    def get_public_key_pem(self) -> str:
        """Get public key in PEM format"""
        pem = self.public_key.public_serialization(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        return pem.decode()

# Database field encryption
class FieldEncryption:
    def __init__(self, encryption_manager: EncryptionManager):
        self.encryption_manager = encryption_manager

    def encrypt_pii_field(self, value: str) -> str:
        """Encrypt PII fields in database"""
        if not value:
            return value
        return self.encryption_manager.encrypt_symmetric(value)

    def decrypt_pii_field(self, encrypted_value: str) -> str:
        """Decrypt PII fields from database"""
        if not encrypted_value:
            return encrypted_value
        return self.encryption_manager.decrypt_symmetric(encrypted_value)

PII Detection and Anonymization

# llm_guardian_cluster/security/pii_protection.py
import re
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import RecognizerResult, OperatorConfig

@dataclass
class PIIDetectionResult:
    entity_type: str
    start: int
    end: int
    confidence: float
    text: str

class PIIProtector:
    def __init__(self):
        self.analyzer = AnalyzerEngine()
        self.anonymizer = AnonymizerEngine()

        # Custom PII patterns
        self.custom_patterns = {
            'API_KEY': re.compile(r'[A-Za-z0-9]{32,}'),
            'IP_ADDRESS': re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
            'MODEL_NAME': re.compile(r'(?:gpt-|claude-|llama-)[A-Za-z0-9\-]+'),
        }

    def detect_pii(self, text: str, language: str = 'en') -> List[PIIDetectionResult]:
        """Detect PII in text using Presidio and custom patterns"""
        results = []

        # Use Presidio for standard PII detection
        presidio_results = self.analyzer.analyze(
            text=text,
            language=language,
            entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD",
                     "US_SSN", "US_DRIVER_LICENSE", "US_PASSPORT", "IBAN_CODE",
                     "IP_ADDRESS", "URL"]
        )

        for result in presidio_results:
            results.append(PIIDetectionResult(
                entity_type=result.entity_type,
                start=result.start,
                end=result.end,
                confidence=result.score,
                text=text[result.start:result.end]
            ))

        # Add custom pattern detection
        for pattern_name, pattern in self.custom_patterns.items():
            for match in pattern.finditer(text):
                results.append(PIIDetectionResult(
                    entity_type=pattern_name,
                    start=match.start(),
                    end=match.end(),
                    confidence=0.9,
                    text=match.group()
                ))

        return results

    def anonymize_pii(self, text: str, anonymization_config: Optional[Dict] = None) -> str:
        """Anonymize PII in text"""
        if anonymization_config is None:
            anonymization_config = {
                "PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
                "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}),
                "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
                "CREDIT_CARD": OperatorConfig("replace", {"new_value": "[CREDIT_CARD]"}),
                "US_SSN": OperatorConfig("replace", {"new_value": "[SSN]"}),
                "IP_ADDRESS": OperatorConfig("replace", {"new_value": "[IP_ADDRESS]"}),
                "API_KEY": OperatorConfig("replace", {"new_value": "[API_KEY]"}),
            }

        # Detect PII first
        pii_results = self.detect_pii(text)

        # Convert to Presidio format
        presidio_results = [
            RecognizerResult(
                entity_type=result.entity_type,
                start=result.start,
                end=result.end,
                score=result.confidence
            )
            for result in pii_results
        ]

        # Anonymize
        anonymized_result = self.anonymizer.anonymize(
            text=text,
            analyzer_results=presidio_results,
            operators=anonymization_config
        )

        return anonymized_result.text

    def pseudonymize_pii(self, text: str) -> Tuple[str, Dict[str, str]]:
        """Pseudonymize PII while maintaining referential integrity"""
        pii_results = self.detect_pii(text)
        pseudonym_map = {}
        pseudonymized_text = text

        # Sort by start position in reverse order to maintain indices
        pii_results.sort(key=lambda x: x.start, reverse=True)

        entity_counters = {}

        for result in pii_results:
            entity_type = result.entity_type
            original_text = result.text

            # Generate consistent pseudonym
            if original_text not in pseudonym_map:
                if entity_type not in entity_counters:
                    entity_counters[entity_type] = 0
                entity_counters[entity_type] += 1

                pseudonym = f"[{entity_type}_{entity_counters[entity_type]}]"
                pseudonym_map[original_text] = pseudonym

            # Replace in text
            pseudonymized_text = (
                pseudonymized_text[:result.start] +
                pseudonym_map[original_text] +
                pseudonymized_text[result.end:]
            )

        return pseudonymized_text, pseudonym_map

Compliance Framework

GDPR Compliance

# llm_guardian_cluster/compliance/gdpr.py
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class DataProcessingPurpose(Enum):
    TRAINING = "training"
    INFERENCE = "inference"
    MONITORING = "monitoring"
    ANALYTICS = "analytics"
    IMPROVEMENT = "improvement"

class LegalBasis(Enum):
    CONSENT = "consent"
    CONTRACT = "contract"
    LEGAL_OBLIGATION = "legal_obligation"
    VITAL_INTERESTS = "vital_interests"
    PUBLIC_TASK = "public_task"
    LEGITIMATE_INTERESTS = "legitimate_interests"

@dataclass
class DataProcessingRecord:
    id: str
    data_subject_id: str
    purpose: DataProcessingPurpose
    legal_basis: LegalBasis
    data_categories: List[str]
    retention_period: timedelta
    processing_start: datetime
    consent_given: bool
    consent_timestamp: Optional[datetime]
    data_source: str
    third_party_sharing: bool
    automated_decision_making: bool

class GDPRComplianceManager:
    def __init__(self):
        self.processing_records: Dict[str, DataProcessingRecord] = {}
        self.retention_policies = {
            DataProcessingPurpose.TRAINING: timedelta(days=2555),  # 7 years
            DataProcessingPurpose.INFERENCE: timedelta(days=30),
            DataProcessingPurpose.MONITORING: timedelta(days=365),
            DataProcessingPurpose.ANALYTICS: timedelta(days=365),
            DataProcessingPurpose.IMPROVEMENT: timedelta(days=1095)  # 3 years
        }

    def record_data_processing(self, record: DataProcessingRecord) -> str:
        """Record data processing activity for GDPR compliance"""
        self.processing_records[record.id] = record
        return record.id

    def check_retention_compliance(self) -> List[str]:
        """Check for data that should be deleted based on retention policies"""
        expired_records = []
        current_time = datetime.utcnow()

        for record_id, record in self.processing_records.items():
            retention_period = self.retention_policies.get(record.purpose, timedelta(days=365))
            expiry_time = record.processing_start + retention_period

            if current_time > expiry_time:
                expired_records.append(record_id)

        return expired_records

    def handle_data_subject_request(self, request_type: str, data_subject_id: str) -> Dict:
        """Handle GDPR data subject requests"""
        subject_records = [
            record for record in self.processing_records.values()
            if record.data_subject_id == data_subject_id
        ]

        if request_type == "access":
            return self._handle_access_request(subject_records)
        elif request_type == "rectification":
            return self._handle_rectification_request(subject_records)
        elif request_type == "erasure":
            return self._handle_erasure_request(subject_records)
        elif request_type == "portability":
            return self._handle_portability_request(subject_records)
        elif request_type == "objection":
            return self._handle_objection_request(subject_records)
        else:
            raise ValueError(f"Unknown request type: {request_type}")

    def _handle_access_request(self, records: List[DataProcessingRecord]) -> Dict:
        """Handle data subject access request"""
        return {
            "request_type": "access",
            "processing_activities": [
                {
                    "purpose": record.purpose.value,
                    "legal_basis": record.legal_basis.value,
                    "data_categories": record.data_categories,
                    "retention_period": record.retention_period.days,
                    "processing_start": record.processing_start.isoformat(),
                    "third_party_sharing": record.third_party_sharing,
                    "automated_decision_making": record.automated_decision_making
                }
                for record in records
            ],
            "data_sources": list(set(record.data_source for record in records)),
            "response_timestamp": datetime.utcnow().isoformat()
        }

    def _handle_erasure_request(self, records: List[DataProcessingRecord]) -> Dict:
        """Handle right to be forgotten request"""
        erasable_records = []
        non_erasable_records = []

        for record in records:
            # Check if erasure is legally required or permitted
            if self._can_erase_data(record):
                erasable_records.append(record.id)
            else:
                non_erasable_records.append({
                    "record_id": record.id,
                    "reason": self._get_non_erasure_reason(record)
                })

        return {
            "request_type": "erasure",
            "erasable_records": erasable_records,
            "non_erasable_records": non_erasable_records,
            "response_timestamp": datetime.utcnow().isoformat()
        }

    def _can_erase_data(self, record: DataProcessingRecord) -> bool:
        """Determine if data can be erased under GDPR"""
        # Data can be erased if:
        # 1. Consent was the legal basis and is withdrawn
        # 2. Data is no longer necessary for the original purpose
        # 3. Processing was unlawful
        # 4. Erasure is required for legal compliance

        if record.legal_basis == LegalBasis.CONSENT and not record.consent_given:
            return True

        # Check if retention period has expired
        retention_period = self.retention_policies.get(record.purpose, timedelta(days=365))
        if datetime.utcnow() > record.processing_start + retention_period:
            return True

        return False

    def _get_non_erasure_reason(self, record: DataProcessingRecord) -> str:
        """Get reason why data cannot be erased"""
        if record.legal_basis == LegalBasis.LEGAL_OBLIGATION:
            return "Data retention required by legal obligation"
        elif record.legal_basis == LegalBasis.LEGITIMATE_INTERESTS:
            return "Legitimate interests override erasure request"
        elif record.purpose == DataProcessingPurpose.TRAINING:
            return "Data required for model integrity and performance"
        else:
            return "Erasure not applicable under current legal basis"

Audit Logging

# llm_guardian_cluster/compliance/audit.py
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import hashlib

class AuditEventType(Enum):
    DATA_ACCESS = "data_access"
    DATA_MODIFICATION = "data_modification"
    DATA_DELETION = "data_deletion"
    USER_AUTHENTICATION = "user_authentication"
    PERMISSION_CHANGE = "permission_change"
    SYSTEM_CONFIGURATION = "system_configuration"
    MODEL_TRAINING = "model_training"
    MODEL_INFERENCE = "model_inference"
    SECURITY_INCIDENT = "security_incident"
    COMPLIANCE_CHECK = "compliance_check"

@dataclass
class AuditEvent:
    event_id: str
    timestamp: datetime
    event_type: AuditEventType
    user_id: Optional[str]
    resource_id: str
    action: str
    result: str
    ip_address: Optional[str]
    user_agent: Optional[str]
    additional_data: Dict[str, Any]

    def to_dict(self) -> Dict[str, Any]:
        return {
            "event_id": self.event_id,
            "timestamp": self.timestamp.isoformat(),
            "event_type": self.event_type.value,
            "user_id": self.user_id,
            "resource_id": self.resource_id,
            "action": self.action,
            "result": self.result,
            "ip_address": self.ip_address,
            "user_agent": self.user_agent,
            "additional_data": self.additional_data
        }

    def calculate_integrity_hash(self) -> str:
        """Calculate hash for audit trail integrity"""
        event_data = json.dumps(self.to_dict(), sort_keys=True)
        return hashlib.sha256(event_data.encode()).hexdigest()

class AuditLogger:
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.previous_hash = None

    async def log_event(self, event: AuditEvent) -> str:
        """Log audit event with integrity protection"""
        # Calculate integrity hash
        event_hash = event.calculate_integrity_hash()

        # Chain with previous event for integrity
        if self.previous_hash:
            chained_data = f"{self.previous_hash}{event_hash}"
            chain_hash = hashlib.sha256(chained_data.encode()).hexdigest()
        else:
            chain_hash = event_hash

        # Store event with integrity information
        audit_record = {
            **event.to_dict(),
            "event_hash": event_hash,
            "chain_hash": chain_hash,
            "previous_hash": self.previous_hash
        }

        await self.storage.store_audit_record(audit_record)
        self.previous_hash = chain_hash

        return event.event_id

    async def verify_audit_trail_integrity(self, start_date: datetime,
                                         end_date: datetime) -> bool:
        """Verify integrity of audit trail for a given period"""
        records = await self.storage.get_audit_records(start_date, end_date)

        if not records:
            return True

        # Sort by timestamp
        records.sort(key=lambda r: r["timestamp"])

        previous_hash = None
        for record in records:
            # Recalculate event hash
            event_dict = {k: v for k, v in record.items()
                         if k not in ["event_hash", "chain_hash", "previous_hash"]}
            calculated_hash = hashlib.sha256(
                json.dumps(event_dict, sort_keys=True).encode()
            ).hexdigest()

            if calculated_hash != record["event_hash"]:
                return False

            # Verify chain integrity
            if previous_hash != record["previous_hash"]:
                return False

            # Verify chain hash
            if previous_hash:
                chained_data = f"{previous_hash}{calculated_hash}"
                expected_chain_hash = hashlib.sha256(chained_data.encode()).hexdigest()
            else:
                expected_chain_hash = calculated_hash

            if expected_chain_hash != record["chain_hash"]:
                return False

            previous_hash = record["chain_hash"]

        return True

Container Security

Security Policies

# k8s/security/pod-security-policy.yaml
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
  name: llm-guardian-psp
spec:
  privileged: false
  allowPrivilegeEscalation: false
  requiredDropCapabilities:
    - ALL
  volumes:
    - "configMap"
    - "emptyDir"
    - "projected"
    - "secret"
    - "downwardAPI"
    - "persistentVolumeClaim"
  runAsUser:
    rule: "MustRunAsNonRoot"
  runAsGroup:
    rule: "MustRunAs"
    ranges:
      - min: 1000
        max: 65535
  seLinux:
    rule: "RunAsAny"
  supplementalGroups:
    rule: "MustRunAs"
    ranges:
      - min: 1000
        max: 65535
  fsGroup:
    rule: "RunAsAny"
  readOnlyRootFilesystem: true
---
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
  name: llm-guardian-gpu-psp
spec:
  privileged: false
  allowPrivilegeEscalation: false
  allowedCapabilities:
    - SYS_ADMIN # Required for NVIDIA GPU access
  volumes:
    - "configMap"
    - "emptyDir"
    - "projected"
    - "secret"
    - "downwardAPI"
    - "persistentVolumeClaim"
    - "hostPath" # Required for GPU device access
  allowedHostPaths:
    - pathPrefix: "/dev/nvidia"
      readOnly: false
    - pathPrefix: "/usr/local/nvidia"
      readOnly: true
  runAsUser:
    rule: "MustRunAsNonRoot"
  runAsGroup:
    rule: "MustRunAs"
    ranges:
      - min: 1000
        max: 65535
  seLinux:
    rule: "RunAsAny"
  supplementalGroups:
    rule: "MustRunAs"
    ranges:
      - min: 1000
        max: 65535
  fsGroup:
    rule: "RunAsAny"

Network Policies

# k8s/security/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: llm-guardian-network-policy
  namespace: llm-guardian
spec:
  podSelector:
    matchLabels:
      app: llm-guardian
  policyTypes:
    - Ingress
    - Egress
  ingress:
    # Allow ingress from API gateway
    - from:
        - podSelector:
            matchLabels:
              app: api-gateway
      ports:
        - protocol: TCP
          port: 8080
    # Allow ingress from monitoring
    - from:
        - namespaceSelector:
            matchLabels:
              name: monitoring
      ports:
        - protocol: TCP
          port: 9090
  egress:
    # Allow egress to databases
    - to:
        - podSelector:
            matchLabels:
              app: postgresql
      ports:
        - protocol: TCP
          port: 5432
    # Allow egress to vector database
    - to:
        - podSelector:
            matchLabels:
              app: chromadb
      ports:
        - protocol: TCP
          port: 8000
    # Allow egress to message queue
    - to:
        - podSelector:
            matchLabels:
              app: kafka
      ports:
        - protocol: TCP
          port: 9092
    # Allow DNS resolution
    - to: []
      ports:
        - protocol: UDP
          port: 53
        - protocol: TCP
          port: 53
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: deny-all-default
  namespace: llm-guardian
spec:
  podSelector: {}
  policyTypes:
    - Ingress
    - Egress

Secrets Management

# k8s/security/external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: SecretStore
metadata:
  name: vault-secret-store
  namespace: llm-guardian
spec:
  provider:
    vault:
      server: "https://vault.company.com"
      path: "secret"
      version: "v2"
      auth:
        kubernetes:
          mountPath: "kubernetes"
          role: "llm-guardian-role"
          serviceAccountRef:
            name: "external-secrets-sa"
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: database-credentials
  namespace: llm-guardian
spec:
  refreshInterval: 15s
  secretStoreRef:
    name: vault-secret-store
    kind: SecretStore
  target:
    name: database-secret
    creationPolicy: Owner
  data:
    - secretKey: username
      remoteRef:
        key: database/postgresql
        property: username
    - secretKey: password
      remoteRef:
        key: database/postgresql
        property: password
    - secretKey: host
      remoteRef:
        key: database/postgresql
        property: host
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: api-keys
  namespace: llm-guardian
spec:
  refreshInterval: 15s
  secretStoreRef:
    name: vault-secret-store
    kind: SecretStore
  target:
    name: api-keys-secret
    creationPolicy: Owner
  data:
    - secretKey: openai_api_key
      remoteRef:
        key: api-keys/external
        property: openai_api_key
    - secretKey: encryption_key
      remoteRef:
        key: encryption/master
        property: key

Next: Performance Optimization