Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 154 additions & 25 deletions cyberai/core/base_agent.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,190 @@
"""
BaseAgent — abstract base for all CyberAI agents.

Day 4 of STANDOFF rewrite: this redesign closes KI-3, KI-4, KI-6.

Every agent now receives explicit dependencies (config, session, llm,
audit) and exposes the attributes agents actually use:
self.config, self.session, self.kb, self.llm, self.audit, self.memory
plus helper methods _check_iteration_limit() and _log().
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Dict, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from rich.console import Console

from .config import CyberAIConfig
from .knowledge_base import KnowledgeBase
from .logger import AuditLogger
from rich.console import Console

if TYPE_CHECKING:
from .llm_client import LLMClient
from .scan_session import ScanSession

console = Console()


# ── Tool ──────────────────────────────────────────────────────────────


@dataclass
class Tool:
name: str
"""
A callable capability an agent can invoke.

`params` is the canonical field. `parameters` is accepted as an
alias for backward compatibility — all existing agents register
tools with `parameters=...` (KI-6). Pass either; they are kept
in sync.
"""
name: str
description: str
func: Callable
params: Dict[str, str] = field(default_factory=dict)
func: Callable
params: Dict[str, str] = field(default_factory=dict)
parameters: Optional[Dict[str, str]] = None

def __post_init__(self) -> None:
# KI-6: agents pass parameters=...; mirror it into params.
if self.parameters is not None and not self.params:
self.params = self.parameters
# Keep parameters readable as an alias too.
self.parameters = self.params


# ── AgentMemory ───────────────────────────────────────────────────────


class AgentMemory:
"""
Minimal multi-turn conversation memory for agents that talk to an LLM
across several steps (ExploitAgent in particular — KI-4).
"""

def __init__(self) -> None:
self._messages: List[Dict[str, str]] = []
self._system: Optional[str] = None

def add(self, role: str, content: str) -> None:
"""Add a message. role='system' is stored separately."""
if role == "system":
self._system = content
else:
self._messages.append({"role": role, "content": content})

def to_messages(self) -> List[Dict[str, str]]:
"""Return the message list (excluding system) for an LLM call."""
return list(self._messages)

@property
def system(self) -> Optional[str]:
return self._system

def clear(self) -> None:
self._messages.clear()
self._system = None


# ── AgentIterationLimitError ──────────────────────────────────────────


class AgentIterationLimitError(RuntimeError):
"""Raised when an agent exceeds config.max_agent_iterations."""


# ── BaseAgent ─────────────────────────────────────────────────────────


class BaseAgent(ABC):
"""
Abstract base class for all CyberAI agents.
Each agent: has a role, a tool registry, and access to shared KB.

Agents are constructed with explicit dependencies so they are easy
to test (everything is injectable / mockable):

agent = ReconAgent(config, session, llm, audit)
result = agent.run(target)
"""

AGENT_NAME: str = "base"
ROLE: str = "Generic Agent"

def __init__(
self,
config: CyberAIConfig,
audit: AuditLogger,
session_id: str = "unknown"
):
self.config = config
self.audit = audit
self.session_id = session_id
config: CyberAIConfig,
session: "ScanSession",
llm: Optional["LLMClient"] = None,
audit: Optional[AuditLogger] = None,
) -> None:
self.config = config
self.session = session
self.llm = llm
# KB is taken from the session if present, else a fresh one.
self.kb: KnowledgeBase = getattr(session, "kb", None) or KnowledgeBase()
if not isinstance(self.kb, KnowledgeBase):
# legacy ScanSession.kb may be a plain dict — wrap it
self.kb = KnowledgeBase()
self.audit = audit or AuditLogger(
session_id=getattr(session, "session_id", "unknown")
)
self.memory = AgentMemory()

self.tools: Dict[str, Tool] = {}
self._iterations: int = 0

self._register_tools()

def register_tool(self, tool: Tool):
# ── tool registry ─────────────────────────────────────────────────

def register_tool(self, tool: Tool) -> None:
self.tools[tool.name] = tool

@abstractmethod
def _register_tools(self):
"""Register agent-specific tools"""
pass
def _register_tools(self) -> None:
"""Register agent-specific tools."""

@abstractmethod
def run(self, target: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Main agent execution — returns findings dict"""
pass
def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Main agent execution — returns a result dict."""

def call_tool(self, tool_name: str, **kwargs) -> Any:
def call_tool(self, tool_name: str, **kwargs: Any) -> Any:
if tool_name not in self.tools:
raise ValueError(f"Tool '{tool_name}' not registered in {self.AGENT_NAME}")
raise ValueError(
f"Tool '{tool_name}' not registered in {self.AGENT_NAME}"
)
tool = self.tools[tool_name]
self.audit.agent_action(self.AGENT_NAME, f"calling tool: {tool_name}", kwargs)
console.print(f"[dim cyan][{self.AGENT_NAME}] → {tool_name}[/dim cyan]")
result = tool.func(**kwargs)
return result
return tool.func(**kwargs)

# ── iteration safety ──────────────────────────────────────────────

def _check_iteration_limit(self) -> None:
"""
Increment the step counter and raise if the agent has exceeded
config.max_agent_iterations. Called by agents before each major
step to prevent runaway loops (KI-4).
"""
self._iterations += 1
limit = getattr(self.config, "max_agent_iterations", 10)
if self._iterations > limit:
raise AgentIterationLimitError(
f"{self.AGENT_NAME} exceeded {limit} iterations"
)

def log(self, msg: str, data: Any = None):
# ── logging ───────────────────────────────────────────────────────

def log(self, msg: str, data: Any = None) -> None:
"""Structured log + console echo."""
self.audit.agent_action(self.AGENT_NAME, msg, data)
console.print(f"[cyan][{self.AGENT_NAME}][/cyan] {msg}")

def _log(self, msg: str, data: Any = None) -> None:
"""
Alias for log(). Several agents call self._log(...) (KI-4).
Some legacy call sites pass (event, data) — both forms work
since the first arg is just the message string.
"""
self.log(msg, data)
63 changes: 51 additions & 12 deletions cyberai/core/knowledge_base.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,54 @@
from typing import Any, Dict, List, Optional
"""
KnowledgeBase — shared memory store for all agents in a session.

Day 4 of STANDOFF: `agent` is now optional (defaults to "unknown") so
agents can write quick entries without always naming themselves; the
mutable default `tags=[]` bug is fixed; datetime is timezone-aware.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional


def _now() -> str:
return datetime.now(timezone.utc).isoformat()


@dataclass
class KBEntry:
key: str
value: Any
agent: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
tags: List[str] = field(default_factory=list)
key: str
value: Any
agent: str = "unknown"
timestamp: str = field(default_factory=_now)
tags: List[str] = field(default_factory=list)


class KnowledgeBase:
"""
Shared memory store for all agents in a session.
Agents read/write through trust-validated keys.
"""
def __init__(self):

def __init__(self) -> None:
self._store: Dict[str, KBEntry] = {}
self._history: List[KBEntry] = []

def set(self, key: str, value: Any, agent: str, tags: List[str] = []):
entry = KBEntry(key=key, value=value, agent=agent, tags=tags)
def set(
self,
key: str,
value: Any,
agent: str = "unknown",
tags: Optional[List[str]] = None,
) -> None:
entry = KBEntry(key=key, value=value, agent=agent, tags=tags or [])
self._store[key] = entry
self._history.append(entry)

def get(self, key: str) -> Optional[Any]:
def get(self, key: str, default: Any = None) -> Optional[Any]:
entry = self._store.get(key)
return entry.value if entry else None
return entry.value if entry else default

def get_by_tag(self, tag: str) -> Dict[str, Any]:
return {
Expand All @@ -45,3 +67,20 @@ def history(self) -> List[Dict]:
{"key": e.key, "agent": e.agent, "timestamp": e.timestamp}
for e in self._history
]

# ── dict-like access ──────────────────────────────────────────────
# Some agents treat the KB like a dict (kb["recon.nmap"]).

def __contains__(self, key: str) -> bool:
return key in self._store

def __getitem__(self, key: str) -> Any:
if key not in self._store:
raise KeyError(key)
return self._store[key].value

def __setitem__(self, key: str, value: Any) -> None:
self.set(key, value)

def __len__(self) -> int:
return len(self._store)
51 changes: 25 additions & 26 deletions docs/architecture/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,26 @@ neither matches the actual API. **Fixed by:** Day 5.

### 🟢 KI-2 — Two competing session classes ✅ FIXED IN DAY 3
`scan_session.py` is now the single source of truth. `session.py` is a
backward-compat shim where `PentestSession` is a subclass of
`ScanSession` preserving legacy attributes. All 8 import sites work
unchanged. Verified by `tests/unit/test_session_shim.py`.
backward-compat shim. Verified by `tests/unit/test_session_shim.py`.

### 🔴 KI-3 — BaseAgent doesn't match what agents use
Agents access `self.session`, `self.kb`, `self.memory`, `self.llm` —
none exist on `BaseAgent`. **Fixed by:** Day 4.
### 🟢 KI-3 — BaseAgent didn't match what agents use ✅ FIXED IN DAY 4
`BaseAgent.__init__` now takes `(config, session, llm, audit)` and
exposes `self.session`, `self.kb`, `self.llm`, `self.memory`. Agents are
migrated to actually use this contract in day 6. Verified by
`tests/unit/test_base_agent.py`.

### 🔴 KI-4 — Agents call non-existent methods
`self._check_iteration_limit()`, `self._log()`, `self.llm.chat()` —
none exist. **Fixed by:** Day 4 + Day 6.
### 🟢 KI-4 — Agents called non-existent methods ✅ FIXED IN DAY 4
`_check_iteration_limit()` and `_log()` now exist on `BaseAgent`.
`AgentMemory` (with `add()`/`to_messages()`) backs `self.memory`.
`self.llm.chat()` is addressed in day 6 when ExploitAgent is migrated to
`self.llm.call()`. Verified by `tests/unit/test_base_agent.py`.

### 🟢 KI-5 — Finding signature mismatch ✅ FIXED IN DAY 3
`Finding` now has `target`, `evidence`, `cve_ids` fields with
backward-compat `cve` ↔ `cve_ids` syncing. `ScanSession.add_finding()`
auto-fills `target` from `session.target`. Verified by
`tests/unit/test_finding_model.py`.
### 🔴 KI-5 — Finding signature mismatch ✅ FIXED IN DAY 3

### 🔴 KI-6 — `Tool` param name mismatch
`Tool` field is `params`, agents register with `parameters=`.
**Fixed by:** Day 4.
### 🟢 KI-6 — Tool param name mismatch ✅ FIXED IN DAY 4
`Tool` accepts both `params` and `parameters`, synced via
`__post_init__`. All agents register tools with `parameters=...` so this
closed without touching any agent file.

### 🔴 KI-7 — `LLMClient.chat()` doesn't exist
Actual method is `call()`. **Fixed by:** Day 6.
Expand All @@ -40,12 +39,12 @@ Actual method is `call()`. **Fixed by:** Day 6.

## Progress tracker

| Day | Issue(s) addressed | Status |
|-----|-------------------|--------|
| 1 | (rebrand only) | ✅ |
| 2 | KI-8 | ✅ |
| 3 | KI-2, KI-5 | ✅ |
| 4 | KI-3, KI-4, KI-6 | ⏳ |
| 5 | KI-1 | ⏳ |
| 6 | KI-7, KI-4 | ⏳ |
| 7 | All checked | ⏳ |
| Day | Issue(s) addressed | Status |
|-----|----------------------|--------|
| 1 | (rebrand only) | ✅ |
| 2 | KI-8 | ✅ |
| 3 | KI-2, KI-5 | ✅ |
| 4 | KI-3, KI-4, KI-6 | ✅ |
| 5 | KI-1 | ⏳ |
| 6 | KI-7, KI-4 (llm.chat)| ⏳ |
| 7 | All checked | ⏳ |
Loading
Loading