Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cyberai/agents/intel/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cyberai.core.scan_session import Severity

from .nvd_client import get_cve, search_cves
from .epss_client import get_epss_scores
from .service_mapper import ports_to_queries, score_to_severity
from cyberai.core.types import CVEEntry, IntelResult

Expand Down Expand Up @@ -62,6 +63,18 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
all_cves.extend(result.get("cves", []))
time.sleep(0.6)

# Enrich CVEs with EPSS scores (probability of exploitation
# in the wild in the next 30 days). Single batched call.
cve_ids = [c["id"] for c in all_cves if c.get("id")]
if cve_ids:
epss_map = get_epss_scores(cve_ids)
for cve in all_cves:
cve["epss"] = epss_map.get(cve.get("id"), 0.0)
self._log(
f"EPSS enrichment: {sum(1 for v in epss_map.values() if v > 0)}/"
f"{len(cve_ids)} CVEs with non-zero score"
)

self.kb.set("intel.cves", all_cves, agent=self.AGENT_NAME)
self._log(f"found {len(all_cves)} CVEs for {len(queries)} services")

Expand Down
22 changes: 16 additions & 6 deletions cyberai/agents/intel/cve_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from datetime import datetime, timezone
from typing import Any, Dict

CVSS_WEIGHT = 0.45
CVSS_WEIGHT = 0.35
EXPLOIT_WEIGHT = 0.30
RECENCY_WEIGHT = 0.15
EPSS_WEIGHT = 0.10
RECENCY_WEIGHT = 0.10
EPSS_WEIGHT = 0.25 # day 11: EPSS is a strong signal, not a footnote


@dataclass
Expand Down Expand Up @@ -99,7 +99,11 @@ def _recency_bonus(cve: Dict) -> float:


def _epss_bonus(cve: Dict) -> float:
return min(float(cve.get("epss") or 0.0) * EPSS_WEIGHT, EPSS_WEIGHT)
"""EPSS as a multiplier-like signal: high probability gets a nonlinear
boost so weaponized vulnerabilities float to the top."""
epss = float(cve.get("epss") or 0.0)
boost = 2.0 if epss > 0.5 else 1.0
return min(epss * EPSS_WEIGHT * boost, EPSS_WEIGHT)


def _tier(score: float) -> str:
Expand All @@ -122,6 +126,12 @@ def _reasoning(cvss: float, exploit: float, recency: float, epss: float) -> str:
parts.append("weaponized/in-wild")
if recency > 0.12:
parts.append("recent CVE")
if epss > 0.05:
parts.append(f"EPSS={epss/EPSS_WEIGHT:.0%}")
# raw EPSS probability β€” recovered from the weighted bonus
epss_raw = epss / EPSS_WEIGHT
if epss_raw > 0.5:
parts.append(f"\U0001f525 EPSS={epss_raw:.0%}") # high exploitation likelihood
elif epss_raw > 0.2:
parts.append(f"\u26a0 EPSS={epss_raw:.0%}") # moderate
elif epss > 0.01:
parts.append(f"EPSS={epss_raw:.0%}")
return " | ".join(parts)
78 changes: 78 additions & 0 deletions cyberai/agents/intel/epss_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""EPSS (Exploit Prediction Scoring System) client β€” api.first.org.

EPSS gives a probability (0.0-1.0) that a CVE will be exploited in the
wild in the next 30 days. Updated daily by FIRST.org. Free, no API key.
"""
from __future__ import annotations

from pathlib import Path
from typing import Dict, List

import httpx

from cyberai.core.cache import FileCache

EPSS_BASE = "https://api.first.org/data/v1/epss"
EPSS_BATCH_SIZE = 100
EPSS_CACHE_TTL = 3600 * 24 # 24h β€” EPSS recomputes once a day

_epss_cache = FileCache(
cache_dir=Path.home() / ".cyberai" / "epss-cache",
ttl=EPSS_CACHE_TTL,
)


def get_epss_scores(cve_ids: List[str]) -> Dict[str, float]:
"""Fetch EPSS scores for a list of CVE IDs.

Batches in groups of 100, caches per-CVE for 24h. CVEs not covered
by EPSS silently get 0.0. HTTP failures degrade to 0.0 β€” the
pipeline must survive an EPSS outage.
"""
if not cve_ids:
return {}

scores: Dict[str, float] = {}
to_fetch: List[str] = []

# 1. cache lookup
for cid in cve_ids:
hit = _epss_cache.get(f"epss:{cid}")
if hit is not None:
scores[cid] = float(hit)
else:
to_fetch.append(cid)

# 2. fetch missing in batches
for i in range(0, len(to_fetch), EPSS_BATCH_SIZE):
batch = to_fetch[i:i + EPSS_BATCH_SIZE]
try:
resp = httpx.get(
EPSS_BASE,
params={"cve": ",".join(batch)},
timeout=15,
)
resp.raise_for_status()
data = resp.json().get("data", [])
except Exception:
# Silent fallback β€” every CVE in this batch -> 0.0, no cache.
for cid in batch:
scores.setdefault(cid, 0.0)
continue

seen = set()
for row in data:
cid = row.get("cve")
epss = float(row.get("epss") or 0.0)
if cid:
scores[cid] = epss
_epss_cache.set(f"epss:{cid}", epss)
seen.add(cid)

# CVEs the API didn't return β€” default to 0.0 without caching
# (they might be added to EPSS later).
for cid in batch:
if cid not in seen:
scores.setdefault(cid, 0.0)

return scores
29 changes: 29 additions & 0 deletions tests/fixtures/epss_log4shell.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"status": "OK",
"status-code": 200,
"version": "1.0",
"access": "public",
"total": 3,
"offset": 0,
"limit": 100,
"data": [
{
"cve": "CVE-2021-44228",
"epss": "0.97539",
"percentile": "0.99987",
"date": "2025-05-01"
},
{
"cve": "CVE-2019-0708",
"epss": "0.94120",
"percentile": "0.99850",
"date": "2025-05-01"
},
{
"cve": "CVE-2020-1234",
"epss": "0.00045",
"percentile": "0.05123",
"date": "2025-05-01"
}
]
}
18 changes: 17 additions & 1 deletion tests/unit/test_cve_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,21 @@ def test_exploit_bonus_all_signals():


def test_epss_bonus_high():
"""High EPSS (>0.5) triggers the day-11 multiplier boost and clamps
to the full EPSS_WEIGHT β€” strongest possible EPSS signal."""
s = score_cve(CRITICAL_CVE)
assert s.epss_bonus == pytest.approx(0.095, abs=1e-6)
assert s.epss_bonus == pytest.approx(EPSS_WEIGHT, abs=1e-6)


def test_epss_bonus_low_is_linear():
"""Low EPSS (<= 0.5) is linear in weight β€” no boost."""
cve = {"cve_id": "CVE-x", "cvss": 5.0, "epss": 0.10}
s = score_cve(cve)
assert s.epss_bonus == pytest.approx(0.10 * EPSS_WEIGHT, abs=1e-6)


def test_epss_bonus_zero():
"""Missing/zero EPSS contributes nothing."""
cve = {"cve_id": "CVE-y", "cvss": 5.0, "epss": 0.0}
s = score_cve(cve)
assert s.epss_bonus == 0.0
117 changes: 117 additions & 0 deletions tests/unit/test_epss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Unit tests for EPSS client + prioritizer integration (day 11).

Uses a captured api.first.org response (epss_log4shell.json) to drive
deterministic tests without hitting the network.
"""
from __future__ import annotations

import json
from pathlib import Path
from unittest.mock import patch, MagicMock

import pytest

from cyberai.agents.intel import epss_client
from cyberai.agents.intel.epss_client import get_epss_scores
from cyberai.agents.intel.risk_prioritizer import prioritize


FIXTURE = Path(__file__).parent.parent / "fixtures" / "epss_log4shell.json"


@pytest.fixture
def epss_response() -> dict:
return json.loads(FIXTURE.read_text())


@pytest.fixture(autouse=True)
def _clean_epss_cache():
"""Each test starts with an empty cache."""
epss_client._epss_cache.clear()
yield
epss_client._epss_cache.clear()


def _mock_resp(payload: dict) -> MagicMock:
m = MagicMock()
m.json.return_value = payload
m.raise_for_status.return_value = None
return m


# ── EPSS client ───────────────────────────────────────────────────────

def test_client_empty_input():
assert get_epss_scores([]) == {}


def test_client_parses_fixture(epss_response):
"""Real-world api.first.org response shape must parse correctly."""
with patch.object(epss_client.httpx, "get",
return_value=_mock_resp(epss_response)):
scores = get_epss_scores([
"CVE-2021-44228", "CVE-2019-0708", "CVE-2020-1234"
])
assert scores["CVE-2021-44228"] == pytest.approx(0.97539)
assert scores["CVE-2019-0708"] == pytest.approx(0.94120)
assert scores["CVE-2020-1234"] == pytest.approx(0.00045)


def test_client_caches_after_fetch(epss_response):
"""Second call for the same CVE must come from cache, not HTTP."""
with patch.object(epss_client.httpx, "get",
return_value=_mock_resp(epss_response)) as m:
get_epss_scores(["CVE-2021-44228"])
get_epss_scores(["CVE-2021-44228"])
assert m.call_count == 1


def test_client_missing_cve_defaults_to_zero(epss_response):
"""CVEs not returned by the API silently default to 0.0."""
with patch.object(epss_client.httpx, "get",
return_value=_mock_resp(epss_response)):
scores = get_epss_scores([
"CVE-2021-44228", "CVE-9999-9999"
])
assert scores["CVE-9999-9999"] == 0.0


def test_client_http_failure_silent_zero():
"""HTTP errors must degrade to 0.0, not crash the pipeline."""
with patch.object(epss_client.httpx, "get",
side_effect=Exception("boom")):
scores = get_epss_scores(["CVE-2021-44228"])
assert scores == {"CVE-2021-44228": 0.0}


# ── prioritizer with EPSS enrichment ──────────────────────────────────

def test_log4shell_tops_priority_thanks_to_epss():
"""Log4Shell (EPSS 0.97) must outrank an older high-CVSS CVE with
no exploitation activity. This is the whole point of EPSS."""
cves = [
{
"cve_id": "CVE-2019-OLD", "cvss": 9.0, "epss": 0.02,
"description_short": "old high CVSS, no exploitation",
"published_date": "2019-01-01T00:00:00",
},
{
"cve_id": "CVE-2021-44228", "cvss": 10.0, "epss": 0.974,
"description_short": "Log4Shell",
"metasploit": True, "exploited_in_wild": True,
"published_date": "2021-12-10T00:00:00",
},
]
ranked = prioritize(cves)
assert ranked[0]["cve_id"] == "CVE-2021-44228"
assert ranked[0]["composite_score"] > ranked[1]["composite_score"]


def test_high_epss_emoji_in_reasoning():
"""πŸ”₯ marker appears for EPSS > 0.5 (day-11 visual signal)."""
cves = [{
"cve_id": "CVE-2021-44228", "cvss": 10.0, "epss": 0.974,
"published_date": "2021-12-10T00:00:00",
}]
ranked = prioritize(cves)
assert "\U0001f525" in ranked[0]["reasoning"]
Loading