Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
* Extensible plugin system for custom modules
* Opt-in pipeline mode to feed Subfinder → Httpx → Nuclei → Nmap → TestSSL
* `--parallel` flag to run independent modules concurrently for speed
* Autonomous `--agent` mode with `--watch` and `--diff-only` for stateful
differential scanning

---

Expand Down
129 changes: 119 additions & 10 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from concurrent.futures import ThreadPoolExecutor
import json
import xml.etree.ElementTree as ET
import time

from rich.console import Console
from rich.table import Table
Expand All @@ -30,6 +31,7 @@
write_pdf,
write_markdown,
safe_filename_component,
timestamp,
)
from utils.notifiers import Notifier
from utils.deps import DependencyError, check_dependencies
Expand Down Expand Up @@ -86,6 +88,7 @@ def pipeline(
*,
show_summary: bool = False,
parallel: bool = False,
prev_state: List[Finding] | None = None,
) -> List[Finding]:
findings: List[Finding] = []

Expand Down Expand Up @@ -119,6 +122,16 @@ def _run_host_tool(tool_cls: type[Module], host_list: List[str], name: str) -> L
# Data passed between modules
hosts: List[str] = [target]
urls: List[str] = []
new_urls: List[str] = []
prev_hosts: set[str] = set()
prev_urls: set[str] = set()

if prev_state:
for f in prev_state:
if f.tool == "subfinder" and f.data.get("host"):
prev_hosts.add(f.data["host"])
elif f.tool == "httpx" and f.data.get("url"):
prev_urls.add(f.data["url"])

processed: set[str] = set()

Expand Down Expand Up @@ -164,11 +177,18 @@ def _run_host_tool(tool_cls: type[Module], host_list: List[str], name: str) -> L
if f.data.get("host") or f.data.get("ip")
]
summary["live_hosts"] += len(res)
new_urls = [u for u in urls if u not in prev_urls]
elif use_pipeline and tool_name == "nuclei":
if prev_state is not None and not new_urls:
logger.info("⏭️ nuclei skipped – no new hosts")
continue
res = tool_cls().run("\n".join(urls))
findings.extend(res)
summary["nuclei"] += len(res)
elif use_pipeline and tool_name in {"nmap", "testssl"}:
if prev_state is not None and not new_urls:
logger.info("⏭️ %s skipped – no new hosts", tool_name)
continue
if parallel and tool_name in {"nmap", "testssl"}:
others = {"nmap", "testssl"} - {tool_name}
batch = [tool_name] + [
Expand Down Expand Up @@ -222,6 +242,12 @@ def _collect_targets(args: argparse.Namespace) -> List[str]:
return targets


def _diff_findings(current: List[Finding], previous: List[Finding]) -> List[Finding]:
"""Return only new findings compared to *previous*."""
prev_set = {json.dumps(f.asdict(), sort_keys=True) for f in previous}
return [f for f in current if json.dumps(f.asdict(), sort_keys=True) not in prev_set]


def cli() -> None:
parser = argparse.ArgumentParser(
description="🛡️ Pentest-Toolkit – modular recon pipeline"
Expand Down Expand Up @@ -289,6 +315,33 @@ def cli() -> None:
action="store_true",
help="Suppress final summary table",
)
parser.add_argument(
"--agent",
action="store_true",
help="Enable autonomous agent mode with stateful diffs",
)
parser.add_argument(
"--watch",
action="store_true",
help="Continuously scan at set intervals (agent mode)",
)
parser.add_argument(
"--interval",
type=int,
default=3600,
help="Watch interval in seconds",
)
parser.add_argument(
"--state-dir",
type=Path,
default=Path("agent_state"),
help="Directory for agent state",
)
parser.add_argument(
"--diff-only",
action="store_true",
help="Only output new findings compared to previous scan",
)
args = parser.parse_args()

notify_names: List[str] = []
Expand Down Expand Up @@ -347,17 +400,73 @@ def run_one(t: str) -> List[Finding]:
logger.info("✅ Finished %s – %s findings", t, len(res))
return res

if args.parallel and len(targets) > 1:
with ThreadPoolExecutor() as pool:
for res in pool.map(run_one, targets):
all_findings.extend(res)
def run_agent() -> None:
args.state_dir.mkdir(exist_ok=True)

while True:
for t in targets:
logger.info("▶️ %s", t)
latest = args.state_dir / f"{safe_filename_component(t)}_latest.json"
prev: List[Finding] = []
if latest.exists():
prev_data = json.loads(latest.read_text())
prev = [Finding(tool=d.pop("tool"), data=d) for d in prev_data]

res = pipeline(
t,
args.tools,
use_pipeline=args.pipeline,
show_summary=not args.no_summary,
parallel=args.parallel,
prev_state=prev,
)
diff = _diff_findings(res, prev)
out = diff if args.diff_only else res
prefix = safe_filename_component(t)
write_json(out, args.out, prefix=prefix)
if args.report == "html":
write_html(out, args.out, prefix=prefix)
elif args.report == "pdf":
write_pdf(out, args.out, prefix=prefix)
elif args.report == "markdown":
write_markdown(out, args.out, prefix=prefix)
elif args.report == "summary":
write_markdown(out, args.out, prefix=prefix, summary_only=True)

for notifier in notifiers:
try:
notifier.send(out)
except Exception as exc: # noqa: BLE001
logger.error("❌ Notifier error: %s", exc)
if args.strict_notify:
raise SystemExit(1)

ts_name = f"{safe_filename_component(t)}_{timestamp()}.json"
(args.state_dir / ts_name).write_text(
json.dumps([f.asdict() for f in res], indent=2)
)
latest.write_text(json.dumps([f.asdict() for f in res], indent=2))
logger.info("✅ Finished %s – %s findings", t, len(res))

if not args.watch:
break
logger.info("⏳ Sleeping %s seconds", args.interval)
time.sleep(args.interval)

if args.agent:
run_agent()
else:
for t in targets:
all_findings.extend(run_one(t))

logger.info(
"✅ Completed %s targets – %s findings collected", len(targets), len(all_findings)
)
if args.parallel and len(targets) > 1:
with ThreadPoolExecutor() as pool:
for res in pool.map(run_one, targets):
all_findings.extend(res)
else:
for t in targets:
all_findings.extend(run_one(t))

logger.info(
"✅ Completed %s targets – %s findings collected", len(targets), len(all_findings)
)


if __name__ == "__main__":
Expand Down
46 changes: 46 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import importlib
from pathlib import Path
import sys

sys.path.append(str(Path(__file__).resolve().parents[1]))

from main import _diff_findings, pipeline
from modules.base import Module, Finding


def test_diff_findings():
old = [Finding(tool="a", data={"x": 1})]
new = [Finding(tool="a", data={"x": 1}), Finding(tool="b", data={"y": 2})]
diff = _diff_findings(new, old)
assert len(diff) == 1
assert diff[0].tool == "b"


def test_pipeline_skip(monkeypatch):
import main
importlib.reload(main)
monkeypatch.setattr(Module, "registry", {})
monkeypatch.setattr(main.Module, "registry", Module.registry)

called = []

class Sub(Module):
name = "subfinder"
def run(self, target: str):
return [Finding(tool=self.name, data={"host": "a.example"})]

class Hx(Module):
name = "httpx"
def run(self, target: str):
return [Finding(tool=self.name, data={"url": "https://a.example", "host": "a.example"})]

class Nuc(Module):
name = "nuclei"
def run(self, target: str):
called.append("nuclei")
return [Finding(tool=self.name, data={})]

prev = [Finding(tool="httpx", data={"url": "https://a.example"})]
modules = ["subfinder", "httpx", "nuclei"]
pipeline("example.com", modules, use_pipeline=True, prev_state=prev)
assert not called