Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions PLUGINS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Plugin Development

Pentest-Toolkit supports extending both scanning modules and notification sinks.
Drop Python files into a `plugins/` directory or set `PENTEST_TOOLKIT_PLUGIN_DIR`
to point elsewhere. Any subclasses of `Module` or `Notifier` discovered in these
files are automatically registered.

## Example Module

```python
from modules.base import Module, Finding

class DirBuster(Module):
name = "dirbuster"
def run(self, target: str):
return [Finding(tool=self.name, data={"target": target})]
```

## Example Notifier

```python
from utils.notifiers import Notifier

class FileNotifier(Notifier):
name = "file"
def __init__(self, path="results.log"):
self.path = path
def send(self, findings):
open(self.path, "a").write(str(findings))
```

Enable with `--notify file` (and provide any config via environment variables or
custom flags).
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,18 @@ trivial.
]
```

Results are stored in `output/` with timestamps. They can also be pushed to
Slack or Microsoft Teams using `--webhook <url> --webhook-type slack|teams`.
Results are stored in `output/` with timestamps. Notifications can be sent via
`--notify <name>` where `<name>` is one of `slack`, `teams`, `discord`,
`telegram`, `elasticsearch` or `splunk`.

---

## Plugins

Drop Python files into a `plugins/` directory or set the
`PENTEST_TOOLKIT_PLUGIN_DIR` environment variable. Each plugin defines a
`Module` subclass that automatically registers when imported.
`Module` or `Notifier` subclass that automatically registers when imported.
See [PLUGINS.md](PLUGINS.md) for more examples.

```python
# plugins/dirbuster.py
Expand Down
31 changes: 16 additions & 15 deletions lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

from modules.base import Module
from utils.deps import check_dependencies, DependencyError
import requests
from utils.logger import get_logger
from utils.notify import notify_slack, notify_teams
from utils.notifiers import Notifier
from utils.output import (
write_html,
write_json,
Expand All @@ -32,22 +31,26 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
- ``target``: domain or IP address to scan (required)
- ``tools``: optional list of tools
- ``report``: ``"html"``, ``"pdf"``, ``"markdown"`` or ``"summary"``
- ``webhook``: optional incoming webhook URL
- ``webhook_type``: ``"slack"`` or ``"teams"`` (default ``"slack"``)
- ``notify``: list of notifier names (e.g. ``["slack"]``)
- ``auto_install``: attempt to install missing binaries (bool)
"""

target = event["target"]
tools: List[str] = event.get("tools", list(Module.registry.keys()))
report = event.get("report")
webhook = event.get("webhook")
webhook_type = event.get("webhook_type", "slack")
strict_webhook = event.get("strict_webhook", False)
notify_names: List[str] = event.get("notify", [])
strict_notify = event.get("strict_notify", False)
auto_install = event.get("auto_install", False)
pipeline_mode = event.get("pipeline", False)

out_dir = Path("/tmp/output")

notifiers = [
Notifier.registry[name](strict=strict_notify)
for name in notify_names
if name in Notifier.registry
]

try:
check_dependencies(tools, auto_install=auto_install)
except DependencyError as exc:
Expand All @@ -67,14 +70,12 @@ def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
elif report == "summary":
write_markdown(findings, out_dir, prefix=prefix, summary_only=True)

if webhook:
for notifier in notifiers:
try:
if webhook_type == "teams":
notify_teams(findings, webhook, strict=strict_webhook)
else:
notify_slack(findings, webhook, strict=strict_webhook)
except requests.RequestException as exc: # noqa: BLE001
logger.error("❌ Webhook error: %s", exc)
raise RuntimeError(str(exc))
notifier.send(findings)
except Exception as exc: # noqa: BLE001
logger.error("❌ Notifier error: %s", exc)
if strict_notify:
raise RuntimeError(str(exc))

return {"count": len(findings), "json": str(json_path)}
47 changes: 27 additions & 20 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from __future__ import annotations

import argparse
import os
from pathlib import Path
from typing import List
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -22,7 +23,6 @@

from modules.base import Module, Finding
import modules # noqa: F401 # populate Module.registry
import requests
from utils.logger import get_logger
from utils.output import (
write_json,
Expand All @@ -31,7 +31,7 @@
write_markdown,
safe_filename_component,
)
from utils.notify import notify_slack, notify_teams
from utils.notifiers import Notifier
from utils.deps import DependencyError, check_dependencies
from utils.plugins import load_plugins

Expand Down Expand Up @@ -264,19 +264,15 @@ def cli() -> None:
help="Generate an HTML, Markdown, PDF or summary report in addition to JSON",
)
parser.add_argument(
"--webhook",
help="Slack or Teams webhook URL to post results",
"--notify",
action="append",
choices=list(Notifier.registry.keys()),
help="Enable one or more notifiers",
)
parser.add_argument(
"--webhook-type",
choices=["slack", "teams"],
default="slack",
help="Webhook service type (default: slack)",
)
parser.add_argument(
"--strict-webhook",
"--strict-notify",
action="store_true",
help="Exit with error if webhook notification fails",
help="Exit with error if notifier fails",
)
parser.add_argument(
"--pipeline",
Expand All @@ -295,6 +291,20 @@ def cli() -> None:
)
args = parser.parse_args()

notify_names: List[str] = []
if args.notify:
notify_names.extend(args.notify)
else:
env = os.environ.get("PENTEST_TOOLKIT_NOTIFY")
if env:
notify_names.extend([n.strip() for n in env.split(",") if n.strip()])

notifiers = [
Notifier.registry[name](strict=args.strict_notify)
for name in notify_names
if name in Notifier.registry
]

targets = _collect_targets(args)
logger.info("🎯 Targets: %s", ", ".join(targets))
logger.info("🧰 Tools : %s", ", ".join(args.tools))
Expand Down Expand Up @@ -327,15 +337,12 @@ def run_one(t: str) -> List[Finding]:
elif args.report == "summary":
write_markdown(res, args.out, prefix=prefix, summary_only=True)

if args.webhook:
for notifier in notifiers:
try:
if args.webhook_type == "slack":
notify_slack(res, args.webhook, strict=args.strict_webhook)
else:
notify_teams(res, args.webhook, strict=args.strict_webhook)
except requests.RequestException as exc:
logger.error("❌ Webhook error: %s", exc)
if args.strict_webhook:
notifier.send(res)
except Exception as exc: # noqa: BLE001
logger.error("❌ Notifier error: %s", exc)
if args.strict_notify:
raise SystemExit(1)
logger.info("✅ Finished %s – %s findings", t, len(res))
return res
Expand Down
83 changes: 83 additions & 0 deletions tests/test_notifiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from modules.base import Finding
from utils.notifiers import (
SlackNotifier,
TeamsNotifier,
DiscordNotifier,
TelegramNotifier,
ElasticsearchNotifier,
SplunkNotifier,
)
import requests
import pytest


def _sample():
return [Finding(tool="dummy", data={"foo": "bar"})]


def _fake_post(payloads):
def _post(url, json=None, headers=None, timeout=10):
payloads["url"] = url
payloads["json"] = json
payloads["headers"] = headers
class Resp:
status_code = 200
def raise_for_status(self):
pass
return Resp()
return _post


def test_slack_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
SlackNotifier("http://example.com").send(_sample())
assert payloads["url"] == "http://example.com"
assert "Pentest-Toolkit" in payloads["json"]["text"]


def test_teams_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
TeamsNotifier("http://example.com").send(_sample())
assert payloads["url"] == "http://example.com"
assert "Pentest-Toolkit" in payloads["json"]["text"]


def test_discord_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
DiscordNotifier("http://example.com").send(_sample())
assert payloads["url"] == "http://example.com"
assert "Pentest-Toolkit" in payloads["json"]["content"]


def test_telegram_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
TelegramNotifier("tok", "chat").send(_sample())
assert "bottok" in payloads["url"]
assert payloads["json"]["chat_id"] == "chat"


def test_elasticsearch_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
ElasticsearchNotifier("http://es").send(_sample())
assert payloads["url"] == "http://es"
assert isinstance(payloads["json"], list)


def test_splunk_notifier(monkeypatch):
payloads = {}
monkeypatch.setattr("utils.notifiers.requests.post", _fake_post(payloads))
SplunkNotifier("http://spl", "tok").send(_sample())
assert payloads["headers"]["Authorization"] == "Splunk tok"


def test_notifier_failure(monkeypatch):
def boom(url, json=None, headers=None, timeout=10):
raise requests.RequestException("fail")
monkeypatch.setattr("utils.notifiers.requests.post", boom)
with pytest.raises(requests.RequestException):
SlackNotifier("http://bad", strict=True).send(_sample())
78 changes: 0 additions & 78 deletions tests/test_notify.py

This file was deleted.

Loading