Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cyberai/agents/exploit/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

from .attack_path import AttackPath, build_attack_paths
from .chain_builder import build_exploit_chain
from cyberai.core.types import (
AttackPath as AttackPathModel,
ExploitChain as ExploitChainModel,
ExploitResult,
)
from .cvss_analyzer import analyze_attack_vector

console = Console()
Expand Down Expand Up @@ -69,6 +74,21 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
"exploit_chain": chain,
"ai_analysis": analysis,
}
# Build a validated ExploitResult and store it in the KB.
exploit_result = ExploitResult(
target=target,
attack_paths=[
AttackPathModel(**p.to_dict()) for p in attack_paths
],
chain=(
ExploitChainModel(**chain)
if isinstance(chain, dict) else None
),
ai_analysis=analysis,
)
self.kb.set(
"exploit.result", exploit_result.model_dump(), agent=self.AGENT_NAME
)
self.kb.set("exploit", result, agent=self.AGENT_NAME)
self._log("Exploit analysis complete", {"paths_found": len(attack_paths)})
return result
Expand Down
25 changes: 25 additions & 0 deletions cyberai/agents/intel/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .nvd_client import get_cve, search_cves
from .service_mapper import ports_to_queries, score_to_severity
from cyberai.core.types import CVEEntry, IntelResult


class IntelAgent(BaseAgent):
Expand Down Expand Up @@ -80,6 +81,30 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
(cve.get("cvss", {}) or {}).get("vector", "")],
)

# Build a validated IntelResult and store it in the KB.
def _cvss_score(c: dict) -> float:
raw = c.get("cvss")
if isinstance(raw, dict):
return float(raw.get("score") or 0.0)
return float(raw or 0.0)

intel_result = IntelResult(
target=target,
cves=[
CVEEntry(
id=c.get("id") or c.get("cve_id", ""),
cvss=_cvss_score(c),
severity=score_to_severity(_cvss_score(c)),
description=c.get("description", ""),
published=c.get("published") or None,
exploited_in_wild=c.get("exploited_in_wild", False),
epss=float(c.get("epss") or 0.0),
)
for c in all_cves
],
)
self.kb.set("intel.result", intel_result.model_dump(), agent=self.AGENT_NAME)

result = {
"status": "done",
"queries": queries,
Expand Down
14 changes: 14 additions & 0 deletions cyberai/agents/recon/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from cyberai.core.base_agent import BaseAgent, Tool
from cyberai.core.scan_session import Severity
from cyberai.core.types import OpenPort, ReconResult

from .dns_tool import detect_subdomains, run_dns, run_whois
from .nmap_tool import run_nmap
Expand Down Expand Up @@ -88,4 +89,17 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
evidence=[str(p) for p in ports],
)

# Build a validated pydantic ReconResult and store it in the KB.
recon_result = ReconResult(
target=target,
ports=[OpenPort(**p) for p in ports if isinstance(p, dict)],
whois=whois_result if isinstance(whois_result, dict) else {},
dns=dns_result if isinstance(dns_result, dict) else {},
subdomains=(
sub_result.get("subdomains", [])
if isinstance(sub_result, dict) else []
),
)
self.kb.set("recon.result", recon_result.model_dump(), agent=self.AGENT_NAME)

return {"status": "done", "kb_keys": list(results.keys()), "ports": ports}
71 changes: 66 additions & 5 deletions cyberai/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Any, Union
from pathlib import Path

from pydantic import BaseModel, Field

# Target types
Target = str # IP, CIDR, or domain
PortNumber = int # 1-65535
Expand All @@ -16,18 +18,77 @@

# Recon
PortList = list[PortNumber]
ServiceMap = dict[str, ServiceName] # "80" → "http"
ReconResult = dict[str, Any]
ServiceMap = dict[str, ServiceName] # "80" -> "http"


class OpenPort(BaseModel):
"""A single open port discovered during recon."""
port: int
protocol: str = "tcp"
service: str = "unknown"
version: str | None = None


class ReconResult(BaseModel):
"""Structured output of the ReconAgent."""
target: str
ports: list[OpenPort] = Field(default_factory=list)
whois: dict[str, Any] = Field(default_factory=dict)
dns: dict[str, Any] = Field(default_factory=dict)
subdomains: list[str] = Field(default_factory=list)

# Intel
CVEId = str # "CVE-2024-1234"
CVEList = list[CVEId]
RiskLevel = str # "CRITICAL" | "HIGH" | "MEDIUM" | "LOW"
IntelResult = dict[str, Any]


class CVEEntry(BaseModel):
"""A single CVE with scoring and threat-intel context."""
id: str
cvss: float = 0.0
severity: str = "UNKNOWN"
description: str = ""
published: str | None = None
exploited_in_wild: bool = False
epss: float = 0.0


class IntelResult(BaseModel):
"""Structured output of the IntelAgent."""
target: str
cves: list[CVEEntry] = Field(default_factory=list)
services: dict[str, Any] = Field(default_factory=dict)

# Exploit
AttackPath = dict[str, Any]
ExploitResult = dict[str, Any]


class AttackPath(BaseModel):
"""A single attack path derived from one CVE."""
cve_id: str
attack_vector: str = "Unknown"
attack_complexity: str = "Unknown"
technique: str = ""
success_probability: float = 0.0
requires_auth: bool = False
requires_interaction: bool = False
notes: str = ""


class ExploitChain(BaseModel):
"""An ordered, MITRE-mapped sequence of exploitation steps."""
target: str
chain_length: int = 0
steps: list[dict[str, Any]] = Field(default_factory=list)
summary: str = ""


class ExploitResult(BaseModel):
"""Structured output of the ExploitAgent."""
target: str
attack_paths: list[AttackPath] = Field(default_factory=list)
chain: ExploitChain | None = None
ai_analysis: str = ""

# Report
ReportPath = Path
Expand Down
Loading