diff --git a/src/governance_engine/__pycache__/compliance_engine.cpython-312.pyc b/src/governance_engine/__pycache__/compliance_engine.cpython-312.pyc new file mode 100644 index 0000000..96be640 Binary files /dev/null and b/src/governance_engine/__pycache__/compliance_engine.cpython-312.pyc differ diff --git a/src/governance_engine/__pycache__/gsri_scoring_engine.cpython-312.pyc b/src/governance_engine/__pycache__/gsri_scoring_engine.cpython-312.pyc index 47d087f..0239a2d 100644 Binary files a/src/governance_engine/__pycache__/gsri_scoring_engine.cpython-312.pyc and b/src/governance_engine/__pycache__/gsri_scoring_engine.cpython-312.pyc differ diff --git a/src/governance_engine/compliance_engine.py b/src/governance_engine/compliance_engine.py new file mode 100644 index 0000000..9a098c6 --- /dev/null +++ b/src/governance_engine/compliance_engine.py @@ -0,0 +1,89 @@ +import hashlib +import json +import numpy as np + +class MASFEATCompliance: + """ + Implements MAS FEAT (Fairness, Ethics, Accountability and Transparency) compliance. + Focuses on ZK-Fairness proofs (Demographic Parity) for MoE nodes. + """ + def __init__(self): + pass + + def calculate_demographic_parity(self, selection_rates): + """ + Calculates the Demographic Parity Difference. + selection_rates: dict mapping group_id to selection_rate (0.0 to 1.0) + """ + rates = list(selection_rates.values()) + if not rates: + return 0.0 + return max(rates) - min(rates) + + def generate_zk_fairness_proof(self, selection_rates, threshold=0.1): + """ + Generates a simulated Zero-Knowledge proof of fairness. + """ + dp_diff = self.calculate_demographic_parity(selection_rates) + is_fair = dp_diff <= threshold + + proof_data = { + "dp_diff": dp_diff, + "threshold": threshold, + "is_fair": is_fair, + "timestamp": str(np.datetime64('now')) + } + + # Simulate a ZK-proof hash + proof_hash = hashlib.sha256(json.dumps(proof_data, sort_keys=True).encode()).hexdigest() + + return { + "proof_hash": proof_hash, + "fairness_verified": is_fair, + "metrics": {"dp_diff": round(dp_diff, 4)} + } + +class HKMAEthicsCompliance: + """ + Implements HKMA Ethics compliance. + Focuses on ASA (Autonomous System Accountability) Interpretability Layer using CAE. + """ + def __init__(self): + pass + + def generate_cae(self, attribution_data): + """ + Generates Contextual Attribution Envelopes (CAE). + attribution_data: dict of feature attributions + """ + if not attribution_data: + return {} + + # CAE is a structured interpretability wrapper + envelope = { + "version": "1.0", + "contextual_bounds": { + "min": round(min(attribution_data.values()), 4), + "max": round(max(attribution_data.values()), 4) + }, + "attributions": {k: round(v, 4) for k, v in attribution_data.items()}, + "integrity_seal": hashlib.sha256(str(attribution_data).encode()).hexdigest() + } + return envelope + +class ComplianceEngine: + def __init__(self): + self.mas_feat = MASFEATCompliance() + self.hkma_ethics = HKMAEthicsCompliance() + self.maturity_score = 3.0 # Target Maturity Score for Q4 2026 + + def run_remediation_audit(self, telemetry): + """ + Runs a full regulatory remediation audit. + """ + results = { + "mas_feat": self.mas_feat.generate_zk_fairness_proof(telemetry.get("selection_rates", {})), + "hkma_ethics_cae": self.hkma_ethics.generate_cae(telemetry.get("attributions", {})), + "ethics_maturity_score": self.maturity_score + } + return results diff --git a/src/governance_engine/gsri_scoring_engine.py b/src/governance_engine/gsri_scoring_engine.py index 351d06c..a7da90b 100644 --- a/src/governance_engine/gsri_scoring_engine.py +++ b/src/governance_engine/gsri_scoring_engine.py @@ -1,21 +1,26 @@ import numpy as np +from src.governance_engine.compliance_engine import ComplianceEngine class GSRIScoringEngine: """ Bayesian-based systemic risk monitor for the Omni-Sentinel environment. Calculates the Global Systemic Risk Index (G-SRI). + Integrates regulatory compliance remediation for MAS FEAT and HKMA Ethics. """ def __init__(self, prior_risk=0.2): self.prior_risk = prior_risk self.threshold = 40.0 + self.compliance_engine = ComplianceEngine() def calculate_gsri(self, telemetry_data): """ Calculates GSRI using a simplified Bayesian update. telemetry_data: dict containing risk factors (0.0 to 1.0) """ - # Risk factors: alignment_drift, compute_anomaly, breakout_probability - factors = list(telemetry_data.values()) + # Extract direct risk factors for Bayesian update + direct_factors = {k: v for k, v in telemetry_data.items() if isinstance(v, (int, float))} + factors = list(direct_factors.values()) + if not factors: return self.prior_risk * 100 @@ -28,11 +33,34 @@ def calculate_gsri(self, telemetry_data): gsri = posterior * 100 return round(gsri, 2) - def is_safe(self, gsri): + def verify_compliance(self, telemetry_data): + """ + Verifies regulatory compliance against MAS FEAT and HKMA Ethics. + """ + return self.compliance_engine.run_remediation_audit(telemetry_data) + + def is_safe(self, gsri, compliance_results=None): + """ + Determines if the environment is safe based on GSRI and optional compliance status. + """ + if compliance_results: + # If MAS FEAT fairness is not verified, it's an automatic UNSAFE state + if not compliance_results.get("mas_feat", {}).get("fairness_verified", True): + return False + return gsri < self.threshold if __name__ == "__main__": engine = GSRIScoringEngine() - test_data = {"alignment_drift": 0.1, "compute_anomaly": 0.05, "breakout_probability": 0.02} + test_data = { + "alignment_drift": 0.1, + "compute_anomaly": 0.05, + "breakout_probability": 0.02, + "selection_rates": {"group_a": 0.8, "group_b": 0.75}, + "attributions": {"feature_1": 0.5, "feature_2": -0.2} + } gsri = engine.calculate_gsri(test_data) - print(f"G-SRI: {gsri} (Safe: {engine.is_safe(gsri)})") + compliance = engine.verify_compliance(test_data) + print(f"G-SRI: {gsri}") + print(f"Compliance Results: {compliance}") + print(f"Safe: {engine.is_safe(gsri, compliance)}") diff --git a/src/infrastructure/__pycache__/pqc_worm_logger.cpython-312.pyc b/src/infrastructure/__pycache__/pqc_worm_logger.cpython-312.pyc index 55d4939..a30c540 100644 Binary files a/src/infrastructure/__pycache__/pqc_worm_logger.cpython-312.pyc and b/src/infrastructure/__pycache__/pqc_worm_logger.cpython-312.pyc differ diff --git a/src/infrastructure/__pycache__/tpm_attestor.cpython-312.pyc b/src/infrastructure/__pycache__/tpm_attestor.cpython-312.pyc index 7cd233f..d4042e3 100644 Binary files a/src/infrastructure/__pycache__/tpm_attestor.cpython-312.pyc and b/src/infrastructure/__pycache__/tpm_attestor.cpython-312.pyc differ diff --git a/src/roadmap/REFERENCE_ARCHITECTURE.md b/src/roadmap/REFERENCE_ARCHITECTURE.md index 8e985c7..34e33d4 100644 --- a/src/roadmap/REFERENCE_ARCHITECTURE.md +++ b/src/roadmap/REFERENCE_ARCHITECTURE.md @@ -16,6 +16,8 @@ Logs are signed using Post-Quantum Cryptographic algorithms (ML-DSA) to ensure l ### TPM Attestor Verifies that the cognitive environment (OS, Drivers, Orchestrator) has not been tampered with before allowing high-risk cognitive tasks. -## 3. Regulatory Compliance +## 3. Regulatory Compliance & Remediation +- **MAS FEAT (Fairness, Ethics, Accountability and Transparency)**: Implements ZK-Fairness proofs for retail-facing Mixture of Experts (MoE) nodes, ensuring Demographic Parity. +- **HKMA Ethics Compliance**: ASA Interpretability Layer using Contextual Attribution Envelopes (CAE) for model accountability. - **ZK-Snarks**: Used for proving compliance with safety constraints without leaking proprietary model weights or internal telemetry details. - **OSCAL**: Standardized machine-readable compliance documentation for automated audits. diff --git a/tests/__pycache__/test_compliance.cpython-312.pyc b/tests/__pycache__/test_compliance.cpython-312.pyc new file mode 100644 index 0000000..e6b4500 Binary files /dev/null and b/tests/__pycache__/test_compliance.cpython-312.pyc differ diff --git a/tests/__pycache__/test_governance.cpython-312.pyc b/tests/__pycache__/test_governance.cpython-312.pyc index bb89c6a..f00c591 100644 Binary files a/tests/__pycache__/test_governance.cpython-312.pyc and b/tests/__pycache__/test_governance.cpython-312.pyc differ diff --git a/tests/test_compliance.py b/tests/test_compliance.py new file mode 100644 index 0000000..41929d4 --- /dev/null +++ b/tests/test_compliance.py @@ -0,0 +1,47 @@ +import unittest +from src.governance_engine.compliance_engine import ComplianceEngine, MASFEATCompliance, HKMAEthicsCompliance +from src.governance_engine.gsri_scoring_engine import GSRIScoringEngine + +class TestComplianceSystem(unittest.TestCase): + def setUp(self): + self.engine = ComplianceEngine() + + def test_mas_feat_fairness(self): + mas = MASFEATCompliance() + # Fair scenario + fair_rates = {"group_a": 0.5, "group_b": 0.55} + proof = mas.generate_zk_fairness_proof(fair_rates) + self.assertTrue(proof["fairness_verified"]) + self.assertLessEqual(proof["metrics"]["dp_diff"], 0.1) + + # Unfair scenario + unfair_rates = {"group_a": 0.8, "group_b": 0.4} + proof = mas.generate_zk_fairness_proof(unfair_rates) + self.assertFalse(proof["fairness_verified"]) + self.assertGreater(proof["metrics"]["dp_diff"], 0.1) + + def test_hkma_ethics_cae(self): + hkma = HKMAEthicsCompliance() + attributions = {"age": 0.45, "income": -0.12, "location": 0.05} + cae = hkma.generate_cae(attributions) + + self.assertEqual(cae["version"], "1.0") + self.assertEqual(cae["contextual_bounds"]["max"], 0.45) + self.assertEqual(cae["contextual_bounds"]["min"], -0.12) + self.assertIn("integrity_seal", cae) + + def test_gsri_compliance_integration(self): + gsri_engine = GSRIScoringEngine() + telemetry = { + "drift": 0.05, + "selection_rates": {"a": 0.5, "b": 0.8} # Unfair + } + gsri = gsri_engine.calculate_gsri(telemetry) + compliance = gsri_engine.verify_compliance(telemetry) + + self.assertFalse(gsri_engine.is_safe(gsri, compliance)) + self.assertFalse(compliance["mas_feat"]["fairness_verified"]) + self.assertEqual(compliance["ethics_maturity_score"], 3.0) + +if __name__ == "__main__": + unittest.main()