-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrules_engine.py
More file actions
5646 lines (5199 loc) · 379 KB
/
Copy pathrules_engine.py
File metadata and controls
5646 lines (5199 loc) · 379 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
AI/ML Threat Modeling Rules Engine — 2026 Architecture
=======================================================
Architecture:
• Weighted control scoring : Yes=0, Partial=1, Unknown=2, No=3
• Per-finding risk formula : risk = (likelihood × impact) + control_gap_score
• Severity thresholds : 0-4 Low | 5-8 Medium | 9-12 High | 13+ Critical
• Confidence scoring : High / Medium / Low per finding
• Modular threat categories : LLM, RAG, Agentic, MCP, Classical ML, Infra/Data/Identity
• Full OWASP coverage : LLM Top 10 · ML Top 10 · Agentic ASI · MCP Top 10
• Attack graph chains : dynamic, multi-step
• Abuse-case layer : what an attacker achieves
• Deduplication : clustered by root_cause
• Compliance : specific article references
• how_to_test : per finding (rendered for security persona)
"""
from collections import defaultdict
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SEVERITY_ORDER = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}
CONTROL_SCORE = {"Yes": 0, "Partial": 1, "Unknown": 2, "No": 3, "Not Applicable": 0, None: 2}
COMPLIANCE_DETAIL = {
"GDPR": ["GDPR Art.5(1)(f) – integrity & confidentiality",
"GDPR Art.32 – security of processing",
"GDPR Art.35 – DPIA for high-risk AI"],
"HIPAA": ["HIPAA §164.312(a)(1) – access controls",
"HIPAA §164.312(e)(1) – transmission security",
"HIPAA §164.308(a)(1) – security management"],
"PCI DSS": ["PCI DSS Req.3.4 – protect stored data",
"PCI DSS Req.6.2 – secure development",
"PCI DSS Req.10.2 – implement audit logs"],
"SOC 2": ["SOC2 CC6.1 – logical access controls",
"SOC2 CC6.6 – network boundary protection",
"SOC2 CC7.2 – system monitoring"],
"ISO 27001":["ISO 27001 A.8.2 – information classification",
"ISO 27001 A.9.4 – system access control",
"ISO 27001 A.12.4 – logging & monitoring"],
"NIST AI RMF":["NIST AI RMF – GOVERN 1.2",
"NIST AI RMF – MAP 2.3",
"NIST AI RMF – MEASURE 2.5"],
"EU AI Act":["EU AI Act Art.9 – risk management system",
"EU AI Act Art.13 – transparency",
"EU AI Act Art.15 – accuracy, robustness & cybersecurity"],
"CCPA": ["CCPA §1798.100 – right to know",
"CCPA §1798.150 – security breach liability"],
}
# ---------------------------------------------------------------------------
# Input usage tracking (Part 7 / Part 5)
# ---------------------------------------------------------------------------
USED_KEYS: set = set()
def _reset_used_keys():
"""Clear tracking set at the start of each evaluate_threats() call."""
USED_KEYS.clear()
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _cs(inputs, key):
"""Return control score (0-3) for a single key.
List values: empty list → 3 (no control), non-empty → 0 (control present).
Side-effect: records the key in USED_KEYS for coverage auditing."""
USED_KEYS.add(key)
val = inputs.get(key)
if isinstance(val, list):
return 0 if val else 3
return CONTROL_SCORE.get(val, 2)
def filter_inputs_for_engine(inputs):
"""Strip Not-Applicable keys that are irrelevant to the detected AI type.
This prevents Not-Applicable values from inflating control-gap scores for
controls that genuinely don't apply to the system under review.
The filtered copy is what evaluate_threats() operates on internally.
"""
ai_type = inputs.get("ai_type", "")
profile = get_ai_profile(inputs)
_agentic_keys = {
"agentic_autonomous", "agentic_tool_access", "agentic_logging",
"agentic_hitl", "agentic_sensitive_data", "agentic_malicious_input_detection",
"agentic_memory", "agentic_memory_controls", "agentic_identity_scoped",
"agentic_code_execution", "agentic_code_sandbox", "agentic_supply_chain_controls",
"agentic_kill_switch", "agentic_multi_agent", "agentic_inter_agent_auth",
"agentic_plan_inspection", "agent_collusion_controls", "agent_identity_verification",
"agent_goal_drift_monitoring", "agent_credential_acquisition",
"agent_hitl_bypass_detection", "agent_action_rate_limit", "agent_per_run_budget",
"agent_execution_isolation", "agent_scope_declared", "agent_tool_output_sanitization",
"agent_destructive_action_gate", "agent_data_write_access", "agent_exfil_controls",
}
_mcp_keys = {
"mcp_usage", "mcp_third_party_servers", "mcp_remote_servers",
"mcp_tool_schema_integrity", "mcp_tool_output_sanitization", "mcp_authz",
"mcp_human_approval", "mcp_server_isolation", "mcp_federation_trust",
"mcp_tool_description_validation", "mcp_prompt_in_result_filtering",
"mcp_transport_security", "mcp_shadow_discovery", "mcp_audit_telemetry",
}
_rag_keys = {
"rag_usage", "rag_data_sources", "retrieval_access_control",
"retrieval_content_filtering", "vector_db_isolation",
"embedding_model_provenance", "embedding_inversion_controls",
}
_multimodal_keys = {
"multimodal_injection_testing", "vision_injection_testing",
"audio_injection_testing", "document_pdf_injection",
}
is_agentic = profile in ("agentic",)
is_mcp = inputs.get("mcp_usage") == "Yes" or profile == "mcp"
is_rag = inputs.get("rag_usage") == "Yes" or profile == "rag"
is_multimod = ai_type == "Multimodal AI"
filtered = dict(inputs)
for k, v in inputs.items():
if v == "Not Applicable":
if k in _agentic_keys and not is_agentic:
filtered.pop(k, None)
elif k in _mcp_keys and not is_mcp:
filtered.pop(k, None)
elif k in _rag_keys and not is_rag:
filtered.pop(k, None)
elif k in _multimodal_keys and not is_multimod:
filtered.pop(k, None)
return filtered
def _control_gap(inputs, *keys):
"""Sum of control scores across multiple control keys."""
return sum(_cs(inputs, k) for k in keys)
def _severity_from_risk(risk_value):
"""Map a risk_score to a severity label.
Calibration v3 — additive model: risk = (I × L) + (E × 2) + cgap_mod.
Range with axes 1..5 and cgap_mod 0..3:
min: 1 + 2 + 0 = 3
max: 25 + 10 + 3 = 38
Thresholds tuned so a default-axes (I=L=E=3) finding lands in MEDIUM,
a public worst-case lands in CRITICAL, and the bulk of "weak control"
findings without other risk factors land in MEDIUM rather than HIGH.
0–10 → Low
11–18 → Medium
19–26 → High
27+ → Critical
"""
if risk_value >= 27: return "Critical"
if risk_value >= 19: return "High"
if risk_value >= 11: return "Medium"
return "Low"
def _exposure_score(exposure):
"""Top-level exposure axis (1–5).
Public > Partner API > Authenticated > Internal > Batch/offline.
Used by every finding to compose risk_score = I × L × E + min(cgap, 3).
"""
return {
"Public": 5,
"Partner / third-party API": 4,
"Authenticated Users Only": 3,
"Embedded in product": 3,
"Internal / Private network only": 2,
"Internal-only": 2,
"Back-office batch job": 1,
"Developer-only / experimental": 1,
}.get(exposure, 3)
def _cgap_modifier(cgap):
"""Diminishing modifier so control_gap doesn't dominate the multiplicative
risk_score. Maps any cgap to {0,1,2}.
cgap == 0 → 0
cgap == 1 → 1
cgap == 2 → 1
cgap >= 3 → 2
"""
cgap = max(0, min(int(cgap), 3))
if cgap >= 3: return 2
if cgap >= 1: return 1
return 0
def _autogen_quick_win(quick_win, mitigation):
"""If a finding has no quick_win, derive a short actionable line from its
mitigation so every finding ships with a < 1 day next-step.
"""
if quick_win and quick_win.strip():
return quick_win
if not mitigation:
return "Document the gap and assign an owner this sprint."
# Take the first sentence / first action and trim.
first = mitigation.split(". ")[0].strip().rstrip(".")
if len(first) > 160:
first = first[:157].rsplit(" ", 1)[0] + "…"
return first + "."
def _confidence(strong_signals, total_signals):
"""Compute confidence: High ≥ 60%, Medium ≥ 30%, else Low."""
if total_signals == 0: return "Low"
ratio = strong_signals / total_signals
if ratio >= 0.6: return "High"
if ratio >= 0.3: return "Medium"
return "Low"
def _escalate(severity, business_impact, project_stage):
if business_impact in ("High", "Critical") and project_stage in ("Pilot", "Production"):
rank = SEVERITY_ORDER.get(severity, 2)
rank_to_sev = {v: k for k, v in SEVERITY_ORDER.items()}
return rank_to_sev.get(max(rank - 1, 0), severity)
return severity
def _compliance(*regimes):
"""Expand regime names into specific article references."""
result = []
for r in regimes:
result.extend(COMPLIANCE_DETAIL.get(r, [r]))
return result
def get_ai_profile(inputs):
"""Return a short profile string for the AI system type.
Used to apply profile-specific severity weighting in evaluate_threats().
Profiles: "agentic" | "rag" | "mcp" | "llm" | "generative" | "ml" | "unknown"
"""
ai_type = inputs.get("ai_type", "")
model_type = inputs.get("model_type", "")
if (ai_type == "Agentic AI (e.g., Autonomous Agents)"
or inputs.get("agentic_autonomous") == "Yes"
or model_type in ["Agentic Workflow / Autonomous Agent", "Multi-Agent System"]):
return "agentic"
if (inputs.get("mcp_usage") == "Yes"
or model_type == "MCP / Tool-Integrated Assistant"):
return "mcp"
if (ai_type == "RAG / AI Search"
or inputs.get("rag_usage") == "Yes"
or model_type == "RAG Application"):
return "rag"
if ai_type in ("Large Language Model (LLM)", "Multimodal AI"):
return "llm"
if ai_type == "Generative AI (e.g., Image/Audio Generation)":
return "generative"
if ai_type in (
"Traditional ML", "Computer Vision", "NLP (Non-LLM)",
"Recommendation System", "Anomaly Detection / Fraud Detection",
"Classical ML / Predictive Model",
):
return "ml"
return "unknown"
# Profile-specific likelihood/impact modifiers applied post-finding generation
_PROFILE_BOOSTS = {
# (threat_category, profile) → {"likelihood": Δ, "impact": Δ}
("Agentic", "agentic"): {"impact": 2}, # tool misuse + autonomous-action blast radius
("Classical ML","ml"): {"likelihood": 2}, # data poisoning + model inversion are easier in pure ML
("ML", "ml"): {"likelihood": 1},
("RAG", "rag"): {"likelihood": 1}, # retrieval attacks more feasible
("MCP", "mcp"): {"impact": 1}, # MCP tool scope amplifies impact
("LLM", "llm"): {}, # baseline — no adjustment
}
# Per-profile extra boosts targeted at specific root_causes (Part 4)
_PROFILE_ROOT_CAUSE_BOOSTS = {
"agentic": {
"tool_chaining": {"impact": 1},
"excessive_agency": {"impact": 1},
"missing_oversight": {"impact": 1},
"irreversible_action": {"impact": 1},
"rogue_agent_behaviour": {"impact": 1},
},
"ml": {
"data_poisoning": {"likelihood": 1},
"data_disclosure": {"likelihood": 1},
"model_inversion": {"likelihood": 1},
},
}
# LLM-only categories that should NOT appear when the system is pure ML (Part 4)
_LLM_ONLY_CATEGORIES = {"LLM", "RAG", "Agentic", "MCP"}
def _recompute_risk(t):
"""Recompute risk_score / risk_value / severity from current axes.
Additive model: risk = (I × L) + (E × 2) + cgap_mod."""
i = max(1, min(5, int(t.get("impact_score", 3))))
l = max(1, min(5, int(t.get("likelihood_score", 3))))
e = max(1, min(5, int(t.get("exposure_score", 3))))
cgap_mod = _cgap_modifier(t.get("control_gap_score", 0))
score = (i * l) + (e * 2) + cgap_mod
t["impact_score"] = i
t["likelihood_score"] = l
t["exposure_score"] = e
t["risk_score"] = score
t["risk_value"] = score # backward-compat alias
t["severity"] = _severity_from_risk(score)
def _apply_profile_boosts(threats, profile):
"""Bump impact/likelihood axes for findings whose category or root_cause
is amplified by this profile. Recomputes risk_score on the new scale."""
for t in threats:
cat = t.get("threat_category", "")
rc = t.get("root_cause", "")
delta = dict(_PROFILE_BOOSTS.get((cat, profile), {}) or {})
rc_delta = _PROFILE_ROOT_CAUSE_BOOSTS.get(profile, {}).get(rc)
if rc_delta:
for k, v in rc_delta.items():
delta[k] = delta.get(k, 0) + v
if not delta:
continue
dL = delta.get("likelihood", 0)
dI = delta.get("impact", 0)
if dL or dI:
t["impact_score"] = min(5, t.get("impact_score", 3) + dI)
t["likelihood_score"] = min(5, t.get("likelihood_score", 3) + dL)
_recompute_risk(t)
def _profile_relevance_rank(threat_category, profile):
"""Lower number = more relevant to this profile (used for sort)."""
rel = {
"agentic": ["Agentic", "MCP", "LLM", "RAG", "Infrastructure", "Governance", "Classical ML", "ML"],
"mcp": ["MCP", "Agentic", "LLM", "RAG", "Infrastructure", "Governance", "Classical ML", "ML"],
"rag": ["RAG", "LLM", "Agentic", "Infrastructure", "Governance", "MCP", "Classical ML", "ML"],
"llm": ["LLM", "RAG", "Agentic", "MCP", "Infrastructure", "Governance", "Classical ML", "ML"],
"generative":["LLM", "RAG", "Infrastructure", "Governance", "Agentic", "MCP", "Classical ML", "ML"],
"ml": ["Classical ML", "ML", "Infrastructure", "Governance", "LLM", "RAG", "Agentic", "MCP"],
}.get(profile, [])
if threat_category in rel:
return rel.index(threat_category)
return len(rel) # unknown categories sort last
# FIX 2 (follow-up): one-paragraph summary of the report's risk character
# per AI profile. Returned as `ai_type_summary` in evaluate_threats() result.
_PROFILE_SUMMARIES = {
"ml": (
"This system is primarily data-driven. Key risks center on data poisoning, "
"model inversion, membership inference, and inference-time leakage. "
"Investing in training-data provenance and inference-API hardening pays the most."
),
"agentic": (
"This system has autonomous behaviour. Key risks include tool misuse, "
"privilege escalation, uncontrolled actions, and long-horizon goal drift. "
"Scoped credentials, HITL gates, and per-run budgets are the highest-leverage controls."
),
"rag": (
"This system depends on external data retrieval. Key risks include retrieval "
"poisoning, indirect prompt injection via documents, cross-tenant leakage, and "
"embedding-store exposure. Lock down ingest, isolate per-tenant, and filter retrieved context."
),
"mcp": (
"This system integrates external tools via MCP. Key risks include tool-schema "
"poisoning, intent subversion, lateral movement across tools, and weak transport "
"or audit. Allow-list servers, scope tokens narrowly, and log every tool call."
),
"llm": (
"This system is an LLM-style assistant. Key risks include prompt injection, system-"
"prompt extraction, output handling flaws, and unbounded resource consumption. "
"An AI gateway, output filter, and rate limit shut down most live attacks."
),
"generative": (
"This system generates content. Key risks include harmful-content bypass, "
"deepfake/impersonation, IP regurgitation, and missing provenance. "
"Layer content moderation, watermarking with C2PA, and IP-aware output filters."
),
}
def _build_ai_type_summary(profile):
return _PROFILE_SUMMARIES.get(profile, (
"This AI system has not been classified into a known profile. "
"Key risks span infrastructure hygiene, access control, and governance — "
"tighten the universal controls first, then re-classify the system."
))
def _filter_threats_by_profile(threats, profile):
"""Fix 5 — symmetric per-profile category filter. Each profile keeps only
categories that are genuinely relevant; everything else is dropped so the
report stays profile-shaped, not a kitchen sink. Always allows
Infrastructure + Governance categories so universal hygiene gaps survive.
Attack-graph chains are always kept (they describe actual paths)."""
allowed = {
"ml": {"Classical ML", "ML", "Infrastructure", "Governance",
"Attack Graph"},
"llm": {"LLM", "Infrastructure", "Governance", "Attack Graph"},
"rag": {"RAG", "LLM", "Infrastructure", "Governance",
"Attack Graph"},
"agentic": {"Agentic", "LLM", "MCP", "Infrastructure",
"Governance", "Attack Graph"},
"mcp": {"MCP", "Agentic", "LLM", "Infrastructure",
"Governance", "Attack Graph"},
"generative": {"LLM", "Infrastructure", "Governance", "Attack Graph"},
}
allowed_set = allowed.get(profile)
if not allowed_set:
return threats # unknown profile — keep everything
return [t for t in threats if t.get("threat_category", "") in allowed_set]
# ===========================================================================
# Production-readiness post-processing (severity normalization, dedup,
# capability filter, classification, fix-first selection)
# ===========================================================================
# Findings that should never reach CRITICAL — process / docs gaps amplify
# into noise if scored Critical alongside live exploits.
_NEVER_CRITICAL_IDS = {
"no_model_card", "missing_ai_bom", "iso_42001_gap",
"missing_ai_incident_plan", "third_party_audit_unready",
"missing_data_governance", "no_model_unlearning",
"no_model_explainability", "explainability_high_stakes_llm",
"audit_readiness", "training_data_governance_gap",
"training_data_provenance_gap", "training_data_unknown_source",
"no_adversarial_testing", "no_adv_test_tooling",
}
# Treat anything Governance-categorised or with a "Governance Gap" attacker_goal
# as a governance finding for typing purposes.
_GOVERNANCE_CATEGORIES = {"Governance"}
# Findings whose category is "Infrastructure" but are best classed as
# CONTROL_GAP (missing-control style) rather than full attacker-driven THREAT.
# Kept for backward reference (no longer the primary mechanism).
_CONTROL_GAP_ID_PREFIXES = (
"missing_", "no_", "unencrypted_", "insecure_", "inadequate_",
"service_auth_gap", "token_sprawl",
)
# Fix 4 — explicit classification sets. Predictable, no string-prefix surprises.
# Every Infrastructure-category finding that represents a missing technical
# control belongs here; everything else with attack_path is a THREAT.
_CONTROL_GAP_IDS = {
# Universal infrastructure gaps
"missing_access_control", "missing_ai_gateway", "missing_waf",
"missing_data_governance", "missing_content_provenance",
"inadequate_logging", "insecure_secrets",
"unencrypted_data_rest", "unencrypted_artifacts",
"no_human_oversight", "no_incident_response",
"no_output_watermarking",
"service_auth_gap", "token_sprawl",
# Cost / abuse / hygiene gaps
"cost_blindness", "unpatched_environment", "env_patching_gap",
# Output handling
"insecure_output_handling",
# Supply-chain hygiene gaps (technical, not governance)
"model_supply_chain_risk", "supply_chain_data_risk",
"embedding_supply_chain", "lora_adapter_supply_chain",
"agentic_tool_supply_chain", "model_update_poisoning",
"fine_tune_data_poisoning", "base_model_trust_gap",
# MCP infra
"mcp_audit_gap", "mcp_audit_telemetry_gap",
"mcp_token_mismanagement", "mcp_server_isolation_gap",
"mcp_transport_insecure", "mcp_shadow_servers_unknown",
# Multi-tenant / data hygiene
"multi_tenant_data_leak",
# Adversarial-test tooling gaps
"no_adversarial_testing", "no_adv_test_tooling",
"adv_test_coverage_gap",
# Override / governance-adjacent control gaps
"override_abuse", "consequential_decision_no_override",
}
# Fix 4 — explicit governance / docs / process / regulatory ID set.
_GOVERNANCE_IDS = {
"no_model_card",
"missing_ai_bom",
"iso_42001_gap",
"missing_ai_incident_plan",
"third_party_audit_unready",
"no_model_unlearning",
"no_model_explainability",
"explainability_high_stakes_llm",
"training_data_provenance_gap",
"training_data_unknown_source",
"training_data_ip_leakage",
"eu_ai_act_high_risk_gaps",
"audit_readiness",
}
# Multi-modal / instruction-injection finding ids that should collapse into a
# single "Instruction Injection (Multi-Modal)" finding with a variants[] list.
_INJECTION_MERGE_ROOT_CAUSES = {"prompt_injection", "multimodal_injection",
"indirect_injection"}
# Finding-id → variant-tag, so we can attribute each merged variant to a modality.
_INJECTION_VARIANT_BY_ID = {
"prompt_injection": "text",
"indirect_prompt_injection": "text",
"full_injection_chain": "text",
"rag_indirect_injection": "text",
"multimodal_injection": "image",
"vision_prompt_injection": "image",
"multimodal_visual_injection": "image",
"audio_adversarial_injection": "audio",
"document_pdf_prompt_injection": "document",
"browser_agent_injection": "document",
}
def _detect_ai_capabilities(inputs):
"""Return the set of modalities this system handles.
Hardened (Fix 2): only explicit signals — ai_type, model_type, and
explicit toggles. No string heuristics on the free-form `outputs` field.
"""
caps = {"text"} # text is always assumed present
ai_type = inputs.get("ai_type", "")
model_type = inputs.get("model_type", "")
if (ai_type in ("Multimodal AI", "Computer Vision")
or model_type in ("Multimodal Model", "CNN / Computer Vision")):
caps.add("image")
if inputs.get("audio_injection_testing") not in (None, "Not Applicable"):
caps.add("audio")
if (inputs.get("document_pdf_injection") not in (None, "Not Applicable")
or inputs.get("rag_usage") == "Yes"):
caps.add("document")
if (inputs.get("agentic_tool_access") == "Yes"
or inputs.get("agentic_code_execution") == "Yes"):
caps.add("code")
return caps
def _is_multi_tenant(inputs):
"""Explicit multi-tenant detection (Fix 3) — replaces the old
`users != [] AND tenant_isolation weak` heuristic.
Rules:
- exposure == Public → True
- "Customers" / "Anonymous" / "Partners" in users → True
- exposure in {Internal-only, Internal/Private, Back-office, Dev-only} → False
- else → tenant_isolation weak
"""
exposure = inputs.get("exposure")
users = inputs.get("users") or []
tenant_isolation = inputs.get("tenant_isolation")
if exposure == "Public":
return True
if any(u in users for u in ("Customers", "Anonymous", "Anonymous users", "Partners")):
return True
if exposure in ("Internal / Private network only", "Internal-only",
"Back-office batch job", "Developer-only / experimental"):
return False
return tenant_isolation in ("No", "Partial", "Unknown", None)
def _classify_finding_type(t):
"""Fix 4 — explicit ID-based classifier. Predictable and stable across
rule edits. Order matters: governance > control_gap > category fallback
> THREAT."""
fid = t.get("id", "")
cat = t.get("threat_category", "")
goal = t.get("attacker_goal", "")
# 1. Explicit governance set
if fid in _GOVERNANCE_IDS:
return "GOVERNANCE_GAP"
# 2. Explicit control-gap set
if fid in _CONTROL_GAP_IDS:
return "CONTROL_GAP"
# 3. Category-driven fallback (covers any new Governance-category rule
# we add later without touching this set).
if cat in _GOVERNANCE_CATEGORIES or goal == "Governance Gap":
return "GOVERNANCE_GAP"
# 4. Default: anything else is a THREAT (it has an attack path).
return "THREAT"
def _affected_capability(t, capabilities):
"""Best-effort capability tag for a finding (text/image/audio/code/document/any)."""
fid = t.get("id", "")
rc = t.get("root_cause", "")
if "audio" in fid or "audio" in rc: return "audio"
if "vision" in fid or "image" in fid or "multimodal" in (rc or fid): return "image"
if "document" in fid or "pdf" in fid: return "document"
if "code" in fid or "executor" in fid: return "code"
if t.get("threat_category") in ("LLM", "RAG", "Agentic", "MCP"):
return "text"
return "any"
def _apply_capability_filter(threats, capabilities, has_agents):
"""Drop findings whose capability isn't present on this system; reduce
impact on tool-misuse findings when has_agents=False."""
out = []
for t in threats:
cap = t.get("affected_capability") or _affected_capability(t, capabilities)
if cap == "image" and "image" not in capabilities:
continue
if cap == "audio" and "audio" not in capabilities:
continue
if cap == "document" and "document" not in capabilities:
continue
if cap == "code" and "code" not in capabilities:
continue
# Without agents, tool-abuse / downstream-chain findings have lower impact.
if not has_agents and t.get("threat_category") == "Agentic":
t["risk_value"] = max(0, t.get("risk_value", 0) - 4)
t["severity"] = _severity_from_risk(t["risk_value"])
out.append(t)
return out
def _apply_exposure_normalisation(threats, exposure):
"""No-op under the new scoring model — exposure_score is now baked into
every finding's risk_score (I × L × E + cgap_mod). Kept as a stub so
older callers don't break; safe to delete in a future cleanup."""
return
def _apply_severity_overrides(threats, inputs, capabilities):
"""Hard severity rules: never-CRITICAL list, conditional CRITICAL for
KV-cache (multi-tenant only) and deepfake (audio/voice only).
Calibration (Problem 2): also caps a set of threat types whose worst-case
severity is contextually inappropriate — bias/fairness, ML inversion,
drift, adversarial evasion without safety-critical use, data poisoning
without external ingestion.
"""
multi_tenant = _is_multi_tenant(inputs)
has_audio = "audio" in capabilities
has_external_data = (inputs.get("external_sources") == "Yes"
or bool(inputs.get("rag_data_sources")))
is_production = inputs.get("project_stage") == "Production"
is_safety_critical = (
inputs.get("business_impact") == "Critical"
or "Safety-critical data" in (inputs.get("data_sensitivity") or [])
or "Safety-critical" in (inputs.get("regulated_domain") or [])
)
# Severity-override ceilings (calibration v3 additive scale:
# 0-10 Low / 11-18 Medium / 19-26 High / 27+ Critical).
HIGH_CEILING = 26 # cap at top of HIGH band
MEDIUM_CEILING = 18 # cap at top of MEDIUM band
LOW_CEILING = 10 # cap at top of LOW band
for t in threats:
fid = t.get("id", "")
# Never CRITICAL — cap at HIGH ceiling.
if fid in _NEVER_CRITICAL_IDS and t.get("severity") == "Critical":
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), HIGH_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = _severity_from_risk(t["risk_score"])
t.setdefault("severity_overrides", []).append("never-critical: governance/docs")
# KV-cache: only critical if multi-tenant
if fid == "kv_cache_side_channel":
if not multi_tenant and t.get("severity") in ("Critical", "High"):
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), MEDIUM_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = _severity_from_risk(t["risk_score"])
t.setdefault("severity_overrides", []).append("downgraded: single-tenant deployment")
# Deepfake: only critical if audio capability
if fid in ("deepfake_abuse", "deepfake_impersonation_risk"):
if not has_audio and t.get("severity") in ("Critical", "High"):
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), MEDIUM_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = _severity_from_risk(t["risk_score"])
t.setdefault("severity_overrides", []).append("downgraded: no audio capability")
# Calibration: bias / fairness is rarely Critical on its own — it
# surfaces real harm but usually qualifies as High max without
# additional safety-critical context.
if fid == "bias_exploitation" and t.get("severity") == "Critical":
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), HIGH_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = "High"
t.setdefault("severity_overrides", []).append("capped at HIGH: bias/fairness")
# Calibration: data poisoning needs an external ingestion path to
# be Critical. Without external_sources / RAG sources, cap at HIGH.
if fid in ("data_poisoning_ml", "transfer_learning_poisoning",
"fine_tune_data_poisoning") and not has_external_data:
if t.get("severity") == "Critical":
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), HIGH_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = "High"
t.setdefault("severity_overrides", []).append(
"capped at HIGH: no external data ingestion"
)
# Calibration: adversarial evasion is Critical only when the use case
# is safety-critical (autonomy, biometric gating, payments, healthcare).
if fid in ("adversarial_evasion", "adversarial_evasion_trad_ml",
"edge_physical_adversarial_attack") and not is_safety_critical:
if t.get("severity") == "Critical":
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), HIGH_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = "High"
t.setdefault("severity_overrides", []).append(
"capped at HIGH: not safety-critical use"
)
# Calibration: ML drift / distribution shift is a Production-only
# severity concern. In Dev/PoC, cap at MEDIUM.
if fid in ("ml_drift_exploitation", "ml_distribution_shift",
"rl_policy_distribution_shift") and not is_production:
if t.get("severity") in ("Critical", "High"):
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), MEDIUM_CEILING)
t["risk_value"] = t["risk_score"]
t["severity"] = _severity_from_risk(t["risk_score"])
t.setdefault("severity_overrides", []).append(
"capped at MEDIUM: drift only matters in Production"
)
def _apply_context_hard_caps(threats, inputs):
"""Calibration v3 — hard severity caps based on context, applied AFTER
risk calculation and BEFORE final output. These are non-negotiable:
• project_stage != "Production" → severity ≤ HIGH
• business_impact == "Low" → severity ≤ MEDIUM
• ai_type == "Traditional ML" + sub-types → severity ≤ HIGH
Calibration v4 step-down (refinement spec, Problem 4):
• no auto_action AND no plugin_access AND no external_systems
→ step every finding down one severity level (Critical → High → Medium → Low)
Governance findings are exempt from the Low→Medium cap because regulatory
obligations don't disappear in dev environments — but they do get
flagged as 'upcoming' by compliance_status.
"""
stage = inputs.get("project_stage", "")
bi = inputs.get("business_impact", "")
ai_type = inputs.get("ai_type", "")
ml_types = {"Traditional ML", "Computer Vision", "NLP (Non-LLM)",
"Recommendation System", "Anomaly Detection / Fraud Detection",
"Classical ML / Predictive Model"}
cap_high = (stage != "Production") or (ai_type in ml_types)
cap_medium = (bi == "Low")
no_automation = (inputs.get("auto_action") != "Yes"
and inputs.get("plugin_access") != "Yes"
and inputs.get("external_systems") != "Yes")
if not (cap_high or cap_medium or no_automation):
return
HIGH_TOP, MEDIUM_TOP, LOW_TOP = 26, 18, 10
# Severity → (next-lower severity, score ceiling for that band)
STEP_DOWN = {
"Critical": ("High", HIGH_TOP),
"High": ("Medium", MEDIUM_TOP),
"Medium": ("Low", LOW_TOP),
}
for t in threats:
# 1. Apply MEDIUM cap first (more aggressive). Skip GOVERNANCE — they
# are inherently process-level findings that survive the cap.
if cap_medium and t.get("finding_type") != "GOVERNANCE_GAP":
sev = t.get("severity")
if sev in ("Critical", "High"):
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), MEDIUM_TOP)
t["risk_value"] = t["risk_score"]
t["severity"] = _severity_from_risk(t["risk_score"])
t.setdefault("severity_overrides", []).append("hard cap MEDIUM: business_impact=Low")
# 2. HIGH cap (stage != Production or ML ai_type)
if cap_high and t.get("severity") == "Critical":
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), HIGH_TOP)
t["risk_value"] = t["risk_score"]
t["severity"] = "High"
reason = []
if stage != "Production": reason.append(f"stage={stage}")
if ai_type in ml_types: reason.append(f"ai_type={ai_type}")
t.setdefault("severity_overrides", []).append(
"hard cap HIGH: " + ", ".join(reason)
)
# 3. Calibration v4 (no-automation step-down): one severity level off
# for every finding when the system has no real-world action capability.
# Governance findings are exempt — process gaps don't depend on automation.
if no_automation and t.get("finding_type") != "GOVERNANCE_GAP":
sev = t.get("severity")
step = STEP_DOWN.get(sev)
if step:
new_sev, ceiling = step
t["severity"] = new_sev
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), ceiling)
t["risk_value"] = t["risk_score"]
t.setdefault("severity_overrides", []).append(
"step-down: no automation / plugin / external systems"
)
def _cap_critical(threats, max_critical=5):
"""Enforce a hard cap of `max_critical` Critical findings — downgrade the
lowest-scoring extras to High so the user isn't drowning in red. Pins
risk_score to the top of the HIGH band on the calibration-v3 scale."""
crits = [t for t in threats if t.get("severity") == "Critical"]
if len(crits) <= max_critical:
return
crits.sort(key=lambda t: -t.get("risk_score", t.get("risk_value", 0)))
for t in crits[max_critical:]:
t["severity"] = "High"
t["risk_score"] = min(t.get("risk_score", t.get("risk_value", 0)), 26)
t["risk_value"] = t["risk_score"]
t.setdefault("severity_overrides", []).append(
f"downgraded by cap (>{max_critical} Critical findings)"
)
def _merge_injection_variants(threats, capabilities):
"""Collapse all instruction-injection variants (text / image / audio /
document) into a single 'Instruction Injection (Multi-Modal)' finding
with a variants[] list. Variants whose capability isn't present are
excluded from the variants list (and would already have been filtered
out earlier, but we belt-and-brace here)."""
inj = [t for t in threats
if t.get("root_cause") in _INJECTION_MERGE_ROOT_CAUSES
or t.get("id") in _INJECTION_VARIANT_BY_ID]
if len(inj) < 2:
return threats
# Pick the highest-risk member as the primary; merge metadata from rest.
primary = max(inj, key=lambda t: t.get("risk_value", 0))
variants_seen = []
merged_owasp = set()
merged_mitre = set()
merged_quick = []
for t in inj:
vtag = _INJECTION_VARIANT_BY_ID.get(t.get("id", ""), "text")
if vtag in capabilities and vtag not in variants_seen:
variants_seen.append(vtag)
for tok in (t.get("owasp", "") or "").split("|"):
tok = tok.strip()
if tok: merged_owasp.add(tok)
for tok in (t.get("mitre", "") or "").split("|"):
tok = tok.strip()
if tok: merged_mitre.add(tok)
if t.get("quick_win") and t["quick_win"] not in merged_quick:
merged_quick.append(t["quick_win"])
primary = dict(primary)
primary["id"] = "instruction_injection_multimodal"
primary["title"] = "Instruction Injection (Multi-Modal)"
primary["threat"] = primary["title"]
primary["variants"] = variants_seen or ["text"]
primary["owasp"] = " | ".join(sorted(merged_owasp)) or primary.get("owasp", "")
primary["mitre"] = " | ".join(sorted(merged_mitre)) or primary.get("mitre", "")
primary["description"] = (
"Untrusted content delivered through one or more input modalities "
"(" + ", ".join(variants_seen or ["text"]) + ") embeds hidden "
"instructions that the model interprets as system commands, leading "
"to attacker-controlled output and downstream impact."
)
primary["attack_path"] = [
"Untrusted input received",
"Hidden / malicious instruction extracted",
"Model interprets as system command",
"Attacker-controlled output generated",
"Downstream system impacted",
]
primary["mitigation"] = (
"Sanitise every input modality (text, OCR'd image text, STT, "
"document/PDF extraction) through a prompt-injection classifier "
"before context insertion. Treat all retrieved content as untrusted. "
"Apply output filtering before downstream sinks."
)
if merged_quick:
primary["quick_win"] = merged_quick[0]
primary["supporting_findings"] = [t["id"] for t in inj if t is not primary]
# Build the output list: replace the first injection finding with the
# merged primary, drop the rest.
inj_ids = {id(t) for t in inj}
out = []
inserted = False
for t in threats:
if id(t) in inj_ids:
if not inserted:
out.append(primary); inserted = True
continue
out.append(t)
return out
def _dedupe_by_root_and_mitigation(threats):
"""If two findings share the same root_cause AND identical mitigation
text, keep the highest-risk one and attach the other as a supporting
finding."""
by_key = {}
order = []
for t in threats:
key = (t.get("root_cause", t.get("id")), t.get("mitigation", ""))
if key not in by_key:
by_key[key] = t
order.append(key)
else:
prev = by_key[key]
if t.get("risk_value", 0) > prev.get("risk_value", 0):
prev["supporting_findings"] = (prev.get("supporting_findings") or []) + [t["id"]]
by_key[key] = t
else:
t # discarded but its id captured below
prev.setdefault("supporting_findings", []).append(t["id"])
return [by_key[k] for k in order]
def _dedupe_by_root_cause(threats):
"""Calibration v3 (Problem 6): strict dedup by root_cause. All findings
sharing a root_cause collapse to the one with the highest risk_score; the
rest are listed in `variants[]` on the survivor.
"""
by_root = {}
order = []
for t in threats:
rc = t.get("root_cause") or t.get("id")
if rc not in by_root:
by_root[rc] = t
order.append(rc)
continue
prev = by_root[rc]
prev_score = prev.get("risk_score", prev.get("risk_value", 0))
cur_score = t.get("risk_score", t.get("risk_value", 0))
loser, winner = (prev, t) if cur_score > prev_score else (t, prev)
winner.setdefault("variants", []).append({
"id": loser.get("id"),
"title": loser.get("title") or loser.get("threat"),
"severity": loser.get("severity"),
"risk_score": loser.get("risk_score", loser.get("risk_value", 0)),
})
# Carry forward any variants the loser already had so we don't lose them.
for v in (loser.get("variants") or []):
if v not in winner["variants"]:
winner["variants"].append(v)
by_root[rc] = winner
return [by_root[rc] for rc in order]
def _apply_context_damping(threats, inputs):
"""Reduce per-finding impact / likelihood when the system context says
the worst-case severity is unrealistic.
Damping rules (additive):
• business_impact = "Low" → impact −2
• business_impact = "Moderate" → impact −1
• project_stage in (Idea/PoC, Development) → likelihood −1
• project_stage == Retired → likelihood −2
• no auto_action AND no plugin_access AND no external_systems
→ impact −1
• exposure in (Internal-only, Internal/Private,
Back-office batch job, Developer-only)
→ likelihood −1
Governance findings and graph-chain attacks are NOT damped — they
represent stage-independent obligations / structural attack paths.
"""
bi = inputs.get("business_impact", "Moderate")
stage = inputs.get("project_stage", "Development")
expo = inputs.get("exposure", "")
di = 0
dl = 0
if bi == "Low":
di -= 2
elif bi == "Moderate":
di -= 1
if stage in ("Idea / PoC", "Development"):
dl -= 1
elif stage == "Retired / Decommissioning":
dl -= 2
if (inputs.get("auto_action") != "Yes"
and inputs.get("plugin_access") != "Yes"
and inputs.get("external_systems") != "Yes"):
di -= 1
if expo in ("Internal-only", "Internal / Private network only",
"Back-office batch job", "Developer-only / experimental"):
dl -= 1
if di == 0 and dl == 0:
return
for t in threats:
if t.get("finding_type") == "GOVERNANCE_GAP":
continue
if t.get("is_graph_chain"):
continue
t["impact_score"] = max(1, t.get("impact_score", 3) + di)
t["likelihood_score"] = max(1, t.get("likelihood_score", 3) + dl)
_recompute_risk(t)
if di or dl:
t.setdefault("risk_boosts", []).append(