Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 137 additions & 35 deletions cyberai/core/scan_session.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
"""
ScanSession — full lifecycle manager for a pentest scan.
Ties together: ReconAgent → IntelAgent → ExploitAgent → ReportAgent
ScanSession — single source of truth for a pentest scan lifecycle.

This module holds:
* ScanSession — the lifecycle object (created → running → completed/failed)
* ScanState, ScanPhase — state enums
* PhaseResult — per-phase outcome record
* Severity, Finding — vulnerability finding model (moved here from session.py)

The legacy `cyberai.core.session` module re-exports everything from here
for backward compatibility; new code should import from `scan_session`.
"""
from __future__ import annotations

import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional
import uuid


# ── enums ─────────────────────────────────────────────────────────────


class ScanState(str, Enum):
CREATED = "created"
RUNNING = "running"
RECON = "recon"
INTEL = "intel"
EXPLOIT = "exploit"
REPORT = "report"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
CREATED = "created"
RUNNING = "running"
RECON = "recon"
INTEL = "intel"
EXPLOIT = "exploit"
REPORT = "report"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"


class ScanPhase(str, Enum):
Expand All @@ -29,6 +41,51 @@ class ScanPhase(str, Enum):
REPORT = "report"


class Severity(str, Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"


# ── models ────────────────────────────────────────────────────────────


@dataclass
class Finding:
"""
A single vulnerability finding.

Fields target/evidence/cve_ids added in day 3 (KI-5 fix) — agents
were already passing these but the dataclass didn't accept them.
"""
id: int
severity: Severity
title: str
description: str
timestamp: str
agent: str

# Legacy single-CVE field (kept for backward compat with old callers)
cve: Optional[str] = None
# New: list of CVEs (some findings reference multiple)
cve_ids: List[str] = field(default_factory=list)
# New: target this finding was made against (host, URL, contract addr)
target: Optional[str] = None
# New: artifacts proving the finding (nmap output, request/response, etc.)
evidence: List[Any] = field(default_factory=list)
# Free-form structured data
data: Any = None

def __post_init__(self) -> None:
# Keep `cve` and `cve_ids` in sync for callers that use either
if self.cve and not self.cve_ids:
self.cve_ids = [self.cve]
elif self.cve_ids and not self.cve:
self.cve = self.cve_ids[0]


@dataclass
class PhaseResult:
phase: ScanPhase
Expand All @@ -40,18 +97,24 @@ class PhaseResult:
error: Optional[str] = None


# ── session ───────────────────────────────────────────────────────────


@dataclass
class ScanSession:
target: str
session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
state: ScanState = ScanState.CREATED
created_at: str = field(default_factory=lambda: _now())
started_at: Optional[str] = None
ended_at: Optional[str] = None
phases: List[PhaseResult] = field(default_factory=list)
kb: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
authorized_scope: List[str] = field(default_factory=list)
target: str
session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
state: ScanState = ScanState.CREATED
created_at: str = field(default_factory=lambda: _now())
started_at: Optional[str] = None
ended_at: Optional[str] = None
phases: List[PhaseResult] = field(default_factory=list)
kb: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
authorized_scope: List[str] = field(default_factory=list)

# Findings live on the session — added in day 3 to unify with PentestSession
findings: List[Finding] = field(default_factory=list)

# ── lifecycle ─────────────────────────────────────────────────────

Expand All @@ -75,15 +138,45 @@ def cancel(self) -> None:
def set_phase(self, phase: ScanPhase) -> None:
self.state = ScanState(phase.value)

# ── findings ──────────────────────────────────────────────────────

def add_finding(
self,
severity: Severity,
title: str,
description: str,
agent: str,
target: Optional[str] = None,
cve: Optional[str] = None,
cve_ids: Optional[List[str]] = None,
evidence: Optional[List[Any]] = None,
data: Any = None,
) -> Finding:
f = Finding(
id = len(self.findings) + 1,
severity = severity,
title = title,
description = description,
timestamp = _now(),
agent = agent,
target = target or self.target,
cve = cve,
cve_ids = cve_ids or [],
evidence = evidence or [],
data = data,
)
self.findings.append(f)
return f

# ── phase tracking ────────────────────────────────────────────────

def record_phase(
self,
phase: ScanPhase,
success: bool,
started: str,
data: Dict[str, Any] = None,
error: str = None,
data: Optional[Dict[str, Any]] = None,
error: Optional[str] = None,
) -> PhaseResult:
ended = _now()
duration = _delta(started, ended)
Expand Down Expand Up @@ -113,28 +206,37 @@ def summary(self) -> Dict[str, Any]:
duration = None
if self.started_at and self.ended_at:
duration = round(_delta(self.started_at, self.ended_at), 1)

severity_counts = {s.value: 0 for s in Severity}
for f in self.findings:
severity_counts[f.severity.value] += 1

return {
"session_id": self.session_id,
"target": self.target,
"state": self.state.value,
"created_at": self.created_at,
"started_at": self.started_at,
"ended_at": self.ended_at,
"duration_s": duration,
"phases": [_phase_summary(p) for p in self.phases],
"errors": self.errors,
"kb_keys": list(self.kb.keys()),
"session_id": self.session_id,
"target": self.target,
"state": self.state.value,
"created_at": self.created_at,
"started_at": self.started_at,
"ended_at": self.ended_at,
"duration_s": duration,
"phases": [_phase_summary(p) for p in self.phases],
"findings_total": len(self.findings),
"severity_breakdown": severity_counts,
"errors": self.errors,
"kb_keys": list(self.kb.keys()),
}

def __repr__(self) -> str:
return (
f"ScanSession(id={self.session_id}, "
f"target={self.target}, state={self.state.value})"
f"target={self.target}, state={self.state.value}, "
f"findings={len(self.findings)})"
)


# ── helpers ───────────────────────────────────────────────────────────


def _now() -> str:
return datetime.now(timezone.utc).isoformat()

Expand Down
Loading
Loading