diff --git a/PLUGINS.md b/PLUGINS.md new file mode 100644 index 0000000..6e63ebc --- /dev/null +++ b/PLUGINS.md @@ -0,0 +1,33 @@ +# Plugin Development + +Pentest-Toolkit supports extending both scanning modules and notification sinks. +Drop Python files into a `plugins/` directory or set `PENTEST_TOOLKIT_PLUGIN_DIR` +to point elsewhere. Any subclasses of `Module` or `Notifier` discovered in these +files are automatically registered. + +## Example Module + +```python +from modules.base import Module, Finding + +class DirBuster(Module): + name = "dirbuster" + def run(self, target: str): + return [Finding(tool=self.name, data={"target": target})] +``` + +## Example Notifier + +```python +from utils.notifiers import Notifier + +class FileNotifier(Notifier): + name = "file" + def __init__(self, path="results.log"): + self.path = path + def send(self, findings): + open(self.path, "a").write(str(findings)) +``` + +Enable with `--notify file` (and provide any config via environment variables or +custom flags). diff --git a/README.md b/README.md index 6411c70..385913c 100644 --- a/README.md +++ b/README.md @@ -117,8 +117,9 @@ trivial. ] ``` -Results are stored in `output/` with timestamps. They can also be pushed to -Slack or Microsoft Teams using `--webhook --webhook-type slack|teams`. +Results are stored in `output/` with timestamps. Notifications can be sent via +`--notify ` where `` is one of `slack`, `teams`, `discord`, +`telegram`, `elasticsearch` or `splunk`. --- @@ -126,7 +127,8 @@ Slack or Microsoft Teams using `--webhook --webhook-type slack|teams`. Drop Python files into a `plugins/` directory or set the `PENTEST_TOOLKIT_PLUGIN_DIR` environment variable. Each plugin defines a -`Module` subclass that automatically registers when imported. +`Module` or `Notifier` subclass that automatically registers when imported. +See [PLUGINS.md](PLUGINS.md) for more examples. ```python # plugins/dirbuster.py diff --git a/lambda_function.py b/lambda_function.py index 849f5d6..e3e1603 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -7,9 +7,8 @@ from modules.base import Module from utils.deps import check_dependencies, DependencyError -import requests from utils.logger import get_logger -from utils.notify import notify_slack, notify_teams +from utils.notifiers import Notifier from utils.output import ( write_html, write_json, @@ -32,22 +31,26 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: - ``target``: domain or IP address to scan (required) - ``tools``: optional list of tools - ``report``: ``"html"``, ``"pdf"``, ``"markdown"`` or ``"summary"`` - - ``webhook``: optional incoming webhook URL - - ``webhook_type``: ``"slack"`` or ``"teams"`` (default ``"slack"``) + - ``notify``: list of notifier names (e.g. ``["slack"]``) - ``auto_install``: attempt to install missing binaries (bool) """ target = event["target"] tools: List[str] = event.get("tools", list(Module.registry.keys())) report = event.get("report") - webhook = event.get("webhook") - webhook_type = event.get("webhook_type", "slack") - strict_webhook = event.get("strict_webhook", False) + notify_names: List[str] = event.get("notify", []) + strict_notify = event.get("strict_notify", False) auto_install = event.get("auto_install", False) pipeline_mode = event.get("pipeline", False) out_dir = Path("/tmp/output") + notifiers = [ + Notifier.registry[name](strict=strict_notify) + for name in notify_names + if name in Notifier.registry + ] + try: check_dependencies(tools, auto_install=auto_install) except DependencyError as exc: @@ -67,14 +70,12 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: elif report == "summary": write_markdown(findings, out_dir, prefix=prefix, summary_only=True) - if webhook: + for notifier in notifiers: try: - if webhook_type == "teams": - notify_teams(findings, webhook, strict=strict_webhook) - else: - notify_slack(findings, webhook, strict=strict_webhook) - except requests.RequestException as exc: # noqa: BLE001 - logger.error("❌ Webhook error: %s", exc) - raise RuntimeError(str(exc)) + notifier.send(findings) + except Exception as exc: # noqa: BLE001 + logger.error("❌ Notifier error: %s", exc) + if strict_notify: + raise RuntimeError(str(exc)) return {"count": len(findings), "json": str(json_path)} diff --git a/main.py b/main.py index 067a0f9..19c36bb 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,7 @@ from __future__ import annotations import argparse +import os from pathlib import Path from typing import List from concurrent.futures import ThreadPoolExecutor @@ -22,7 +23,6 @@ from modules.base import Module, Finding import modules # noqa: F401 # populate Module.registry -import requests from utils.logger import get_logger from utils.output import ( write_json, @@ -31,7 +31,7 @@ write_markdown, safe_filename_component, ) -from utils.notify import notify_slack, notify_teams +from utils.notifiers import Notifier from utils.deps import DependencyError, check_dependencies from utils.plugins import load_plugins @@ -264,19 +264,15 @@ def cli() -> None: help="Generate an HTML, Markdown, PDF or summary report in addition to JSON", ) parser.add_argument( - "--webhook", - help="Slack or Teams webhook URL to post results", + "--notify", + action="append", + choices=list(Notifier.registry.keys()), + help="Enable one or more notifiers", ) parser.add_argument( - "--webhook-type", - choices=["slack", "teams"], - default="slack", - help="Webhook service type (default: slack)", - ) - parser.add_argument( - "--strict-webhook", + "--strict-notify", action="store_true", - help="Exit with error if webhook notification fails", + help="Exit with error if notifier fails", ) parser.add_argument( "--pipeline", @@ -295,6 +291,20 @@ def cli() -> None: ) args = parser.parse_args() + notify_names: List[str] = [] + if args.notify: + notify_names.extend(args.notify) + else: + env = os.environ.get("PENTEST_TOOLKIT_NOTIFY") + if env: + notify_names.extend([n.strip() for n in env.split(",") if n.strip()]) + + notifiers = [ + Notifier.registry[name](strict=args.strict_notify) + for name in notify_names + if name in Notifier.registry + ] + targets = _collect_targets(args) logger.info("🎯 Targets: %s", ", ".join(targets)) logger.info("🧰 Tools : %s", ", ".join(args.tools)) @@ -327,15 +337,12 @@ def run_one(t: str) -> List[Finding]: elif args.report == "summary": write_markdown(res, args.out, prefix=prefix, summary_only=True) - if args.webhook: + for notifier in notifiers: try: - if args.webhook_type == "slack": - notify_slack(res, args.webhook, strict=args.strict_webhook) - else: - notify_teams(res, args.webhook, strict=args.strict_webhook) - except requests.RequestException as exc: - logger.error("❌ Webhook error: %s", exc) - if args.strict_webhook: + notifier.send(res) + except Exception as exc: # noqa: BLE001 + logger.error("❌ Notifier error: %s", exc) + if args.strict_notify: raise SystemExit(1) logger.info("✅ Finished %s – %s findings", t, len(res)) return res diff --git a/tests/test_notifiers.py b/tests/test_notifiers.py new file mode 100644 index 0000000..fd3d501 --- /dev/null +++ b/tests/test_notifiers.py @@ -0,0 +1,83 @@ +from modules.base import Finding +from utils.notifiers import ( + SlackNotifier, + TeamsNotifier, + DiscordNotifier, + TelegramNotifier, + ElasticsearchNotifier, + SplunkNotifier, +) +import requests +import pytest + + +def _sample(): + return [Finding(tool="dummy", data={"foo": "bar"})] + + +def _fake_post(payloads): + def _post(url, json=None, headers=None, timeout=10): + payloads["url"] = url + payloads["json"] = json + payloads["headers"] = headers + class Resp: + status_code = 200 + def raise_for_status(self): + pass + return Resp() + return _post + + +def test_slack_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + SlackNotifier("http://example.com").send(_sample()) + assert payloads["url"] == "http://example.com" + assert "Pentest-Toolkit" in payloads["json"]["text"] + + +def test_teams_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + TeamsNotifier("http://example.com").send(_sample()) + assert payloads["url"] == "http://example.com" + assert "Pentest-Toolkit" in payloads["json"]["text"] + + +def test_discord_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + DiscordNotifier("http://example.com").send(_sample()) + assert payloads["url"] == "http://example.com" + assert "Pentest-Toolkit" in payloads["json"]["content"] + + +def test_telegram_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + TelegramNotifier("tok", "chat").send(_sample()) + assert "bottok" in payloads["url"] + assert payloads["json"]["chat_id"] == "chat" + + +def test_elasticsearch_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + ElasticsearchNotifier("http://es").send(_sample()) + assert payloads["url"] == "http://es" + assert isinstance(payloads["json"], list) + + +def test_splunk_notifier(monkeypatch): + payloads = {} + monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads)) + SplunkNotifier("http://spl", "tok").send(_sample()) + assert payloads["headers"]["Authorization"] == "Splunk tok" + + +def test_notifier_failure(monkeypatch): + def boom(url, json=None, headers=None, timeout=10): + raise requests.RequestException("fail") + monkeypatch.setattr("utils.notifiers.requests.post", boom) + with pytest.raises(requests.RequestException): + SlackNotifier("http://bad", strict=True).send(_sample()) diff --git a/tests/test_notify.py b/tests/test_notify.py deleted file mode 100644 index 55d90d5..0000000 --- a/tests/test_notify.py +++ /dev/null @@ -1,78 +0,0 @@ -from modules.base import Finding -from utils.notify import notify_slack, notify_teams -import requests -import pytest - - -def _sample(): - return [Finding(tool="dummy", data={"foo": "bar"})] - - -def test_notify_slack(monkeypatch): - payloads = {} - - def fake_post(url, json, timeout): - payloads['url'] = url - payloads['json'] = json - class Resp: - status_code = 200 - def raise_for_status(self): - pass - return Resp() - - monkeypatch.setattr("utils.notify.requests.post", fake_post) - notify_slack(_sample(), "http://example.com") - assert payloads['url'] == "http://example.com" - assert "Pentest-Toolkit" in payloads['json']['text'] - - -def test_notify_teams(monkeypatch): - payloads = {} - - def fake_post(url, json, timeout): - payloads['url'] = url - payloads['json'] = json - class Resp: - status_code = 200 - def raise_for_status(self): - pass - return Resp() - - monkeypatch.setattr("utils.notify.requests.post", fake_post) - notify_teams(_sample(), "http://example.com") - assert payloads['url'] == "http://example.com" - assert "Pentest-Toolkit" in payloads['json']['text'] - - -def test_notify_slack_failure(monkeypatch): - def boom(url, json, timeout): - raise requests.RequestException("fail") - - monkeypatch.setattr("utils.notify.requests.post", boom) - notify_slack(_sample(), "http://bad", strict=False) - - -def test_notify_slack_failure_strict(monkeypatch): - def boom(url, json, timeout): - raise requests.RequestException("fail") - - monkeypatch.setattr("utils.notify.requests.post", boom) - with pytest.raises(requests.RequestException): - notify_slack(_sample(), "http://bad", strict=True) - - -def test_notify_teams_failure(monkeypatch): - def boom(url, json, timeout): - raise requests.RequestException("fail") - - monkeypatch.setattr("utils.notify.requests.post", boom) - notify_teams(_sample(), "http://bad", strict=False) - - -def test_notify_teams_failure_strict(monkeypatch): - def boom(url, json, timeout): - raise requests.RequestException("fail") - - monkeypatch.setattr("utils.notify.requests.post", boom) - with pytest.raises(requests.RequestException): - notify_teams(_sample(), "http://bad", strict=True) diff --git a/utils/__init__.py b/utils/__init__.py index 91fa947..b1fecf3 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -2,13 +2,26 @@ from .logger import get_logger from .output import write_json, write_html, write_pdf -from .notify import notify_slack, notify_teams +from .notifiers import ( + Notifier, + SlackNotifier, + TeamsNotifier, + DiscordNotifier, + TelegramNotifier, + ElasticsearchNotifier, + SplunkNotifier, +) __all__ = [ "get_logger", "write_json", "write_html", "write_pdf", - "notify_slack", - "notify_teams", + "Notifier", + "SlackNotifier", + "TeamsNotifier", + "DiscordNotifier", + "TelegramNotifier", + "ElasticsearchNotifier", + "SplunkNotifier", ] diff --git a/utils/notifiers/__init__.py b/utils/notifiers/__init__.py new file mode 100644 index 0000000..8977bcb --- /dev/null +++ b/utils/notifiers/__init__.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +"""Notification plugins for Pentest‑Toolkit.""" + +import json +import os +from abc import ABC, abstractmethod +from typing import Dict, List, Type + +import requests + +from modules.base import Finding +from utils.logger import get_logger + +logger = get_logger() + + +class Notifier(ABC): + """Base class for notification plugins.""" + + registry: Dict[str, Type["Notifier"]] = {} + name: str + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if getattr(cls, "name", None): + Notifier.registry[cls.name] = cls + + @abstractmethod + def send(self, findings: List[Finding]) -> None: + """Send *findings* to the destination.""" + ... + + +class SlackNotifier(Notifier): + """Send results to a Slack webhook.""" + + name = "slack" + + def __init__(self, webhook: str | None = None, *, strict: bool = False): + self.webhook = webhook or os.environ.get("SLACK_WEBHOOK") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.webhook: + logger.warning("Slack webhook not configured") + return + payload = { + "text": "Pentest-Toolkit results:\n```%s```" + % json.dumps([f.asdict() for f in findings], indent=2) + } + try: + resp = requests.post(self.webhook, json=payload, timeout=10) + resp.raise_for_status() + logger.info("\U0001F4E3 Slack notification -> %s", resp.status_code) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Slack notification failed: %s", exc) + if self.strict: + raise + + +class TeamsNotifier(Notifier): + """Send results to a Microsoft Teams webhook.""" + + name = "teams" + + def __init__(self, webhook: str | None = None, *, strict: bool = False): + self.webhook = webhook or os.environ.get("TEAMS_WEBHOOK") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.webhook: + logger.warning("Teams webhook not configured") + return + payload = { + "text": "Pentest-Toolkit results:\n%s" + % json.dumps([f.asdict() for f in findings], indent=2) + } + try: + resp = requests.post(self.webhook, json=payload, timeout=10) + resp.raise_for_status() + logger.info("\U0001F4E3 Teams notification -> %s", resp.status_code) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Teams notification failed: %s", exc) + if self.strict: + raise + + +class DiscordNotifier(Notifier): + """Send results to a Discord webhook.""" + + name = "discord" + + def __init__(self, webhook: str | None = None, *, strict: bool = False): + self.webhook = webhook or os.environ.get("DISCORD_WEBHOOK") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.webhook: + logger.warning("Discord webhook not configured") + return + payload = { + "content": "Pentest-Toolkit results:\n```%s```" + % json.dumps([f.asdict() for f in findings], indent=2) + } + try: + resp = requests.post(self.webhook, json=payload, timeout=10) + resp.raise_for_status() + logger.info("\U0001F4E3 Discord notification -> %s", resp.status_code) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Discord notification failed: %s", exc) + if self.strict: + raise + + +class TelegramNotifier(Notifier): + """Send results via the Telegram Bot API.""" + + name = "telegram" + + def __init__( + self, + token: str | None = None, + chat_id: str | None = None, + *, + strict: bool = False, + ): + self.token = token or os.environ.get("TELEGRAM_BOT_TOKEN") + self.chat_id = chat_id or os.environ.get("TELEGRAM_CHAT_ID") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.token or not self.chat_id: + logger.warning("Telegram token/chat_id not configured") + return + url = f"https://api.telegram.org/bot{self.token}/sendMessage" + payload = { + "chat_id": self.chat_id, + "text": "Pentest-Toolkit results:\n```%s```" + % json.dumps([f.asdict() for f in findings], indent=2), + } + try: + resp = requests.post(url, json=payload, timeout=10) + resp.raise_for_status() + logger.info("\U0001F4E3 Telegram notification -> %s", resp.status_code) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Telegram notification failed: %s", exc) + if self.strict: + raise + + +class ElasticsearchNotifier(Notifier): + """Push results to an Elasticsearch HTTP endpoint.""" + + name = "elasticsearch" + + def __init__(self, url: str | None = None, *, strict: bool = False): + self.url = url or os.environ.get("ELASTICSEARCH_URL") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.url: + logger.warning("Elasticsearch URL not configured") + return + try: + resp = requests.post( + self.url, + json=[f.asdict() for f in findings], + timeout=10, + ) + resp.raise_for_status() + logger.info( + "\U0001F4E3 Elasticsearch notification -> %s", + resp.status_code, + ) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Elasticsearch notification failed: %s", exc) + if self.strict: + raise + + +class SplunkNotifier(Notifier): + """Send results to Splunk via HTTP Event Collector.""" + + name = "splunk" + + def __init__( + self, + url: str | None = None, + token: str | None = None, + *, + strict: bool = False, + ): + self.url = url or os.environ.get("SPLUNK_HEC_URL") + self.token = token or os.environ.get("SPLUNK_HEC_TOKEN") + self.strict = strict + + def send(self, findings: List[Finding]) -> None: + if not self.url or not self.token: + logger.warning("Splunk HEC configuration missing") + return + headers = {"Authorization": f"Splunk {self.token}"} + payload = [{"event": f.asdict()} for f in findings] + try: + resp = requests.post(self.url, json=payload, headers=headers, timeout=10) + resp.raise_for_status() + logger.info("\U0001F4E3 Splunk notification -> %s", resp.status_code) + except requests.RequestException as exc: # noqa: BLE001 + logger.warning("\u26A0\uFE0F Splunk notification failed: %s", exc) + if self.strict: + raise + + +__all__ = [ + "Notifier", + "SlackNotifier", + "TeamsNotifier", + "DiscordNotifier", + "TelegramNotifier", + "ElasticsearchNotifier", + "SplunkNotifier", +] diff --git a/utils/notify.py b/utils/notify.py index b1c1292..c4f224b 100644 --- a/utils/notify.py +++ b/utils/notify.py @@ -1,47 +1,18 @@ from __future__ import annotations -import json -from typing import List - -import requests - -from modules.base import Finding -from utils.logger import get_logger -logger = get_logger() +"""Backward compatible wrappers for notification plugins.""" +from typing import List -def _render(findings: List[Finding]) -> str: - return json.dumps([f.asdict() for f in findings], indent=2) +from modules.base import Finding +from utils.notifiers import SlackNotifier, TeamsNotifier def notify_slack(findings: List[Finding], webhook: str, *, strict: bool = False) -> None: - """Send *findings* to a Slack incoming webhook. - - If *strict* is ``True`` the underlying ``requests`` exception will be raised - to the caller. Otherwise a warning is logged on failure. - """ - payload = {"text": f"Pentest-Toolkit results:\n```{_render(findings)}```"} - try: - resp = requests.post(webhook, json=payload, timeout=10) - resp.raise_for_status() - logger.info("📣 Slack notification -> %s", resp.status_code) - except requests.RequestException as exc: # noqa: BLE001 - logger.warning("⚠️ Slack notification failed: %s", exc) - if strict: - raise + """Send *findings* to a Slack incoming webhook.""" + SlackNotifier(webhook, strict=strict).send(findings) def notify_teams(findings: List[Finding], webhook: str, *, strict: bool = False) -> None: - """Send *findings* to a Teams incoming webhook. - - Behaviour mirrors :func:`notify_slack`. - """ - payload = {"text": f"Pentest-Toolkit results:\n{_render(findings)}"} - try: - resp = requests.post(webhook, json=payload, timeout=10) - resp.raise_for_status() - logger.info("📣 Teams notification -> %s", resp.status_code) - except requests.RequestException as exc: # noqa: BLE001 - logger.warning("⚠️ Teams notification failed: %s", exc) - if strict: - raise + """Send *findings* to a Teams incoming webhook.""" + TeamsNotifier(webhook, strict=strict).send(findings)