diff --git a/lambda_function.py b/lambda_function.py index e3e1603..1ef31c9 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -58,17 +58,22 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: raise RuntimeError(str(exc)) findings = pipeline(target, tools, use_pipeline=pipeline_mode, show_summary=False) + ai_text: str | None = None + if event.get("triage"): + from utils.analyst import analyse_findings + + ai_text = analyse_findings(findings) prefix = safe_filename_component(target) json_path = write_json(findings, out_dir, prefix=prefix) if report == "html": write_html(findings, out_dir, prefix=prefix) elif report == "pdf": - write_pdf(findings, out_dir, prefix=prefix) + write_pdf(findings, out_dir, prefix=prefix, ai_summary=ai_text) elif report == "markdown": - write_markdown(findings, out_dir, prefix=prefix) + write_markdown(findings, out_dir, prefix=prefix, ai_summary=ai_text) elif report == "summary": - write_markdown(findings, out_dir, prefix=prefix, summary_only=True) + write_markdown(findings, out_dir, prefix=prefix, summary_only=True, ai_summary=ai_text) for notifier in notifiers: try: diff --git a/main.py b/main.py index 04a5d12..49c54a8 100644 --- a/main.py +++ b/main.py @@ -315,6 +315,13 @@ def cli() -> None: action="store_true", help="Suppress final summary table", ) + parser.add_argument( + "--triage", + "--explain", + dest="triage", + action="store_true", + help="Send findings to an LLM for automated analysis", + ) parser.add_argument( "--agent", action="store_true", @@ -379,16 +386,21 @@ def run_one(t: str) -> List[Finding]: show_summary=not args.no_summary, parallel=args.parallel, ) + ai_text: str | None = None + if args.triage: + from utils.analyst import analyse_findings + + ai_text = analyse_findings(res) prefix = safe_filename_component(t) write_json(res, args.out, prefix=prefix) if args.report == "html": write_html(res, args.out, prefix=prefix) elif args.report == "pdf": - write_pdf(res, args.out, prefix=prefix) + write_pdf(res, args.out, prefix=prefix, ai_summary=ai_text) elif args.report == "markdown": - write_markdown(res, args.out, prefix=prefix) + write_markdown(res, args.out, prefix=prefix, ai_summary=ai_text) elif args.report == "summary": - write_markdown(res, args.out, prefix=prefix, summary_only=True) + write_markdown(res, args.out, prefix=prefix, summary_only=True, ai_summary=ai_text) for notifier in notifiers: try: @@ -423,15 +435,26 @@ def run_agent() -> None: diff = _diff_findings(res, prev) out = diff if args.diff_only else res prefix = safe_filename_component(t) + ai_text: str | None = None + if args.triage: + from utils.analyst import analyse_findings + + ai_text = analyse_findings(out) write_json(out, args.out, prefix=prefix) if args.report == "html": write_html(out, args.out, prefix=prefix) elif args.report == "pdf": - write_pdf(out, args.out, prefix=prefix) + write_pdf(out, args.out, prefix=prefix, ai_summary=ai_text) elif args.report == "markdown": - write_markdown(out, args.out, prefix=prefix) + write_markdown(out, args.out, prefix=prefix, ai_summary=ai_text) elif args.report == "summary": - write_markdown(out, args.out, prefix=prefix, summary_only=True) + write_markdown( + out, + args.out, + prefix=prefix, + summary_only=True, + ai_summary=ai_text, + ) for notifier in notifiers: try: diff --git a/utils/analyst.py b/utils/analyst.py new file mode 100644 index 0000000..1341dbf --- /dev/null +++ b/utils/analyst.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +"""AI analyst integration for Pentest-Toolkit.""" + +import json +import os +from typing import List, Dict, Any + +import requests + +from utils.logger import get_logger +from modules.base import Finding + +logger = get_logger() + + +class LLMClient: + """Minimal client for local or remote LLMs.""" + + def __init__(self, model: str = "gpt-4"): + self.model = model + self.api_key = os.environ.get("OPENAI_API_KEY") + self.ollama_url = os.environ.get("OLLAMA_BASE_URL") + self.custom_endpoint = os.environ.get("LLM_ENDPOINT") + + def analyse(self, findings: List[Finding]) -> str: + """Return a summary string for *findings* using the configured LLM.""" + payload = json.dumps([f.asdict() for f in findings], indent=2) + if self.ollama_url: + url = self.ollama_url.rstrip("/") + "/api/chat" + data = {"model": self.model, "messages": [{"role": "user", "content": payload}]} + return self._post(url, data) + endpoint = self.custom_endpoint or "https://api.openai.com/v1/chat/completions" + headers: Dict[str, str] = {} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + data = {"model": self.model, "messages": [{"role": "user", "content": payload}]} + return self._post(endpoint, data, headers=headers) + + def _post(self, url: str, data: Dict[str, Any], headers: Dict[str, str] | None = None) -> str: + try: + resp = requests.post(url, json=data, headers=headers, timeout=60) + resp.raise_for_status() + out = resp.json() + if isinstance(out, dict) and out.get("choices"): + return out["choices"][0]["message"]["content"] + return resp.text + except Exception as exc: # noqa: BLE001 + logger.warning("LLM request failed: %s", exc) + return "AI analysis failed" + + +def analyse_findings(findings: List[Finding]) -> str: + """Convenience wrapper using :class:`LLMClient`.""" + client = LLMClient() + return client.analyse(findings) diff --git a/utils/output.py b/utils/output.py index c4fcdf8..140ee80 100644 --- a/utils/output.py +++ b/utils/output.py @@ -168,6 +168,7 @@ def write_markdown( *, prefix: str | None = None, summary_only: bool = False, + ai_summary: str | None = None, ) -> Path: """Generate a Markdown report.""" out_dir.mkdir(exist_ok=True) @@ -201,6 +202,10 @@ def write_markdown( for r in recs: lines.append(f"- {r}") + if ai_summary: + lines.append("\n## AI Analyst Summary") + lines.append(ai_summary) + if not summary_only: lines.append("\n## Findings by Tool") lines.append("| Tool | Data |") @@ -221,8 +226,9 @@ def write_pdf( out_dir: Path = Path("output"), *, prefix: str | None = None, + ai_summary: str | None = None, ) -> Path: - md_path = write_markdown(findings, out_dir, prefix=prefix) + md_path = write_markdown(findings, out_dir, prefix=prefix, ai_summary=ai_summary) html = markdown(md_path.read_text(), extensions=["tables"]) pdf_path = md_path.with_suffix(".pdf") HTML(string=html).write_pdf(str(pdf_path))