Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,22 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
raise RuntimeError(str(exc))

findings = pipeline(target, tools, use_pipeline=pipeline_mode, show_summary=False)
ai_text: str | None = None
if event.get("triage"):
from utils.analyst import analyse_findings

ai_text = analyse_findings(findings)
prefix = safe_filename_component(target)
json_path = write_json(findings, out_dir, prefix=prefix)

if report == "html":
write_html(findings, out_dir, prefix=prefix)
elif report == "pdf":
write_pdf(findings, out_dir, prefix=prefix)
write_pdf(findings, out_dir, prefix=prefix, ai_summary=ai_text)
elif report == "markdown":
write_markdown(findings, out_dir, prefix=prefix)
write_markdown(findings, out_dir, prefix=prefix, ai_summary=ai_text)
elif report == "summary":
write_markdown(findings, out_dir, prefix=prefix, summary_only=True)
write_markdown(findings, out_dir, prefix=prefix, summary_only=True, ai_summary=ai_text)

for notifier in notifiers:
try:
Expand Down
35 changes: 29 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ def cli() -> None:
action="store_true",
help="Suppress final summary table",
)
parser.add_argument(
"--triage",
"--explain",
dest="triage",
action="store_true",
help="Send findings to an LLM for automated analysis",
)
parser.add_argument(
"--agent",
action="store_true",
Expand Down Expand Up @@ -379,16 +386,21 @@ def run_one(t: str) -> List[Finding]:
show_summary=not args.no_summary,
parallel=args.parallel,
)
ai_text: str | None = None
if args.triage:
from utils.analyst import analyse_findings

ai_text = analyse_findings(res)
prefix = safe_filename_component(t)
write_json(res, args.out, prefix=prefix)
if args.report == "html":
write_html(res, args.out, prefix=prefix)
elif args.report == "pdf":
write_pdf(res, args.out, prefix=prefix)
write_pdf(res, args.out, prefix=prefix, ai_summary=ai_text)
elif args.report == "markdown":
write_markdown(res, args.out, prefix=prefix)
write_markdown(res, args.out, prefix=prefix, ai_summary=ai_text)
elif args.report == "summary":
write_markdown(res, args.out, prefix=prefix, summary_only=True)
write_markdown(res, args.out, prefix=prefix, summary_only=True, ai_summary=ai_text)

for notifier in notifiers:
try:
Expand Down Expand Up @@ -423,15 +435,26 @@ def run_agent() -> None:
diff = _diff_findings(res, prev)
out = diff if args.diff_only else res
prefix = safe_filename_component(t)
ai_text: str | None = None
if args.triage:
from utils.analyst import analyse_findings

ai_text = analyse_findings(out)
write_json(out, args.out, prefix=prefix)
if args.report == "html":
write_html(out, args.out, prefix=prefix)
elif args.report == "pdf":
write_pdf(out, args.out, prefix=prefix)
write_pdf(out, args.out, prefix=prefix, ai_summary=ai_text)
elif args.report == "markdown":
write_markdown(out, args.out, prefix=prefix)
write_markdown(out, args.out, prefix=prefix, ai_summary=ai_text)
elif args.report == "summary":
write_markdown(out, args.out, prefix=prefix, summary_only=True)
write_markdown(
out,
args.out,
prefix=prefix,
summary_only=True,
ai_summary=ai_text,
)

for notifier in notifiers:
try:
Expand Down
56 changes: 56 additions & 0 deletions utils/analyst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations

"""AI analyst integration for Pentest-Toolkit."""

import json
import os
from typing import List, Dict, Any

import requests

from utils.logger import get_logger
from modules.base import Finding

logger = get_logger()


class LLMClient:
"""Minimal client for local or remote LLMs."""

def __init__(self, model: str = "gpt-4"):
self.model = model
self.api_key = os.environ.get("OPENAI_API_KEY")
self.ollama_url = os.environ.get("OLLAMA_BASE_URL")
self.custom_endpoint = os.environ.get("LLM_ENDPOINT")

def analyse(self, findings: List[Finding]) -> str:
"""Return a summary string for *findings* using the configured LLM."""
payload = json.dumps([f.asdict() for f in findings], indent=2)
if self.ollama_url:
url = self.ollama_url.rstrip("/") + "/api/chat"
data = {"model": self.model, "messages": [{"role": "user", "content": payload}]}
return self._post(url, data)
endpoint = self.custom_endpoint or "https://api.openai.com/v1/chat/completions"
headers: Dict[str, str] = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
data = {"model": self.model, "messages": [{"role": "user", "content": payload}]}
return self._post(endpoint, data, headers=headers)

def _post(self, url: str, data: Dict[str, Any], headers: Dict[str, str] | None = None) -> str:
try:
resp = requests.post(url, json=data, headers=headers, timeout=60)
resp.raise_for_status()
out = resp.json()
if isinstance(out, dict) and out.get("choices"):
return out["choices"][0]["message"]["content"]
return resp.text
except Exception as exc: # noqa: BLE001
logger.warning("LLM request failed: %s", exc)
return "AI analysis failed"


def analyse_findings(findings: List[Finding]) -> str:
"""Convenience wrapper using :class:`LLMClient`."""
client = LLMClient()
return client.analyse(findings)
8 changes: 7 additions & 1 deletion utils/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def write_markdown(
*,
prefix: str | None = None,
summary_only: bool = False,
ai_summary: str | None = None,
) -> Path:
"""Generate a Markdown report."""
out_dir.mkdir(exist_ok=True)
Expand Down Expand Up @@ -201,6 +202,10 @@ def write_markdown(
for r in recs:
lines.append(f"- {r}")

if ai_summary:
lines.append("\n## AI Analyst Summary")
lines.append(ai_summary)

if not summary_only:
lines.append("\n## Findings by Tool")
lines.append("| Tool | Data |")
Expand All @@ -221,8 +226,9 @@ def write_pdf(
out_dir: Path = Path("output"),
*,
prefix: str | None = None,
ai_summary: str | None = None,
) -> Path:
md_path = write_markdown(findings, out_dir, prefix=prefix)
md_path = write_markdown(findings, out_dir, prefix=prefix, ai_summary=ai_summary)
html = markdown(md_path.read_text(), extensions=["tables"])
pdf_path = md_path.with_suffix(".pdf")
HTML(string=html).write_pdf(str(pdf_path))
Expand Down