diff --git a/README.md b/README.md index 385913c..c74d7ab 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,8 @@ * Extensible plugin system for custom modules * Opt-in pipeline mode to feed Subfinder → Httpx → Nuclei → Nmap → TestSSL * `--parallel` flag to run independent modules concurrently for speed +* Autonomous `--agent` mode with `--watch` and `--diff-only` for stateful + differential scanning --- diff --git a/main.py b/main.py index 19c36bb..04a5d12 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor import json import xml.etree.ElementTree as ET +import time from rich.console import Console from rich.table import Table @@ -30,6 +31,7 @@ write_pdf, write_markdown, safe_filename_component, + timestamp, ) from utils.notifiers import Notifier from utils.deps import DependencyError, check_dependencies @@ -86,6 +88,7 @@ def pipeline( *, show_summary: bool = False, parallel: bool = False, + prev_state: List[Finding] | None = None, ) -> List[Finding]: findings: List[Finding] = [] @@ -119,6 +122,16 @@ def _run_host_tool(tool_cls: type[Module], host_list: List[str], name: str) -> L # Data passed between modules hosts: List[str] = [target] urls: List[str] = [] + new_urls: List[str] = [] + prev_hosts: set[str] = set() + prev_urls: set[str] = set() + + if prev_state: + for f in prev_state: + if f.tool == "subfinder" and f.data.get("host"): + prev_hosts.add(f.data["host"]) + elif f.tool == "httpx" and f.data.get("url"): + prev_urls.add(f.data["url"]) processed: set[str] = set() @@ -164,11 +177,18 @@ def _run_host_tool(tool_cls: type[Module], host_list: List[str], name: str) -> L if f.data.get("host") or f.data.get("ip") ] summary["live_hosts"] += len(res) + new_urls = [u for u in urls if u not in prev_urls] elif use_pipeline and tool_name == "nuclei": + if prev_state is not None and not new_urls: + logger.info("⏭️ nuclei skipped – no new hosts") + continue res = tool_cls().run("\n".join(urls)) findings.extend(res) summary["nuclei"] += len(res) elif use_pipeline and tool_name in {"nmap", "testssl"}: + if prev_state is not None and not new_urls: + logger.info("⏭️ %s skipped – no new hosts", tool_name) + continue if parallel and tool_name in {"nmap", "testssl"}: others = {"nmap", "testssl"} - {tool_name} batch = [tool_name] + [ @@ -222,6 +242,12 @@ def _collect_targets(args: argparse.Namespace) -> List[str]: return targets +def _diff_findings(current: List[Finding], previous: List[Finding]) -> List[Finding]: + """Return only new findings compared to *previous*.""" + prev_set = {json.dumps(f.asdict(), sort_keys=True) for f in previous} + return [f for f in current if json.dumps(f.asdict(), sort_keys=True) not in prev_set] + + def cli() -> None: parser = argparse.ArgumentParser( description="🛡️ Pentest-Toolkit – modular recon pipeline" @@ -289,6 +315,33 @@ def cli() -> None: action="store_true", help="Suppress final summary table", ) + parser.add_argument( + "--agent", + action="store_true", + help="Enable autonomous agent mode with stateful diffs", + ) + parser.add_argument( + "--watch", + action="store_true", + help="Continuously scan at set intervals (agent mode)", + ) + parser.add_argument( + "--interval", + type=int, + default=3600, + help="Watch interval in seconds", + ) + parser.add_argument( + "--state-dir", + type=Path, + default=Path("agent_state"), + help="Directory for agent state", + ) + parser.add_argument( + "--diff-only", + action="store_true", + help="Only output new findings compared to previous scan", + ) args = parser.parse_args() notify_names: List[str] = [] @@ -347,17 +400,73 @@ def run_one(t: str) -> List[Finding]: logger.info("✅ Finished %s – %s findings", t, len(res)) return res - if args.parallel and len(targets) > 1: - with ThreadPoolExecutor() as pool: - for res in pool.map(run_one, targets): - all_findings.extend(res) + def run_agent() -> None: + args.state_dir.mkdir(exist_ok=True) + + while True: + for t in targets: + logger.info("▶️ %s", t) + latest = args.state_dir / f"{safe_filename_component(t)}_latest.json" + prev: List[Finding] = [] + if latest.exists(): + prev_data = json.loads(latest.read_text()) + prev = [Finding(tool=d.pop("tool"), data=d) for d in prev_data] + + res = pipeline( + t, + args.tools, + use_pipeline=args.pipeline, + show_summary=not args.no_summary, + parallel=args.parallel, + prev_state=prev, + ) + diff = _diff_findings(res, prev) + out = diff if args.diff_only else res + prefix = safe_filename_component(t) + write_json(out, args.out, prefix=prefix) + if args.report == "html": + write_html(out, args.out, prefix=prefix) + elif args.report == "pdf": + write_pdf(out, args.out, prefix=prefix) + elif args.report == "markdown": + write_markdown(out, args.out, prefix=prefix) + elif args.report == "summary": + write_markdown(out, args.out, prefix=prefix, summary_only=True) + + for notifier in notifiers: + try: + notifier.send(out) + except Exception as exc: # noqa: BLE001 + logger.error("❌ Notifier error: %s", exc) + if args.strict_notify: + raise SystemExit(1) + + ts_name = f"{safe_filename_component(t)}_{timestamp()}.json" + (args.state_dir / ts_name).write_text( + json.dumps([f.asdict() for f in res], indent=2) + ) + latest.write_text(json.dumps([f.asdict() for f in res], indent=2)) + logger.info("✅ Finished %s – %s findings", t, len(res)) + + if not args.watch: + break + logger.info("⏳ Sleeping %s seconds", args.interval) + time.sleep(args.interval) + + if args.agent: + run_agent() else: - for t in targets: - all_findings.extend(run_one(t)) - - logger.info( - "✅ Completed %s targets – %s findings collected", len(targets), len(all_findings) - ) + if args.parallel and len(targets) > 1: + with ThreadPoolExecutor() as pool: + for res in pool.map(run_one, targets): + all_findings.extend(res) + else: + for t in targets: + all_findings.extend(run_one(t)) + + logger.info( + "✅ Completed %s targets – %s findings collected", len(targets), len(all_findings) + ) if __name__ == "__main__": diff --git a/tests/test_agent.py b/tests/test_agent.py new file mode 100644 index 0000000..2c3cde7 --- /dev/null +++ b/tests/test_agent.py @@ -0,0 +1,46 @@ +import importlib +from pathlib import Path +import sys + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from main import _diff_findings, pipeline +from modules.base import Module, Finding + + +def test_diff_findings(): + old = [Finding(tool="a", data={"x": 1})] + new = [Finding(tool="a", data={"x": 1}), Finding(tool="b", data={"y": 2})] + diff = _diff_findings(new, old) + assert len(diff) == 1 + assert diff[0].tool == "b" + + +def test_pipeline_skip(monkeypatch): + import main + importlib.reload(main) + monkeypatch.setattr(Module, "registry", {}) + monkeypatch.setattr(main.Module, "registry", Module.registry) + + called = [] + + class Sub(Module): + name = "subfinder" + def run(self, target: str): + return [Finding(tool=self.name, data={"host": "a.example"})] + + class Hx(Module): + name = "httpx" + def run(self, target: str): + return [Finding(tool=self.name, data={"url": "https://a.example", "host": "a.example"})] + + class Nuc(Module): + name = "nuclei" + def run(self, target: str): + called.append("nuclei") + return [Finding(tool=self.name, data={})] + + prev = [Finding(tool="httpx", data={"url": "https://a.example"})] + modules = ["subfinder", "httpx", "nuclei"] + pipeline("example.com", modules, use_pipeline=True, prev_state=prev) + assert not called