From 5958c5e889766695879c80098175b7568e3ea84c Mon Sep 17 00:00:00 2001 From: ttlequals0 Date: Mon, 23 Feb 2026 18:07:39 -0500 Subject: [PATCH 1/2] Fix healthcheck pings blocked by SSRF protection (v2.5.66) Add TRUSTED_INTERNAL_HOSTS env var to allowlist hostnames and CIDR ranges that should bypass SSRF private-IP blocking. Healthcheck servers on private networks were blocked since v2.5.64. --- CHANGELOG.MD | 9 +++ config.py | 5 ++ docker-compose.yml | 8 ++- docs/CONFIGURATION.md | 27 ++++++++ pixelprobe/utils/security.py | 77 ++++++++++++++++++++++- tests/test_security_fixes.py | 116 ++++++++++++++++++++++++++++++++++- version.py | 2 +- 7 files changed, 240 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index e772069..c21e77e 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0). +## [2.5.66] - 2026-02-23 + +### Fixed + +- **Healthcheck pings blocked by SSRF protection**: Since v2.5.64, healthcheck pings for scheduled scans could fail when the healthcheck server resolves to a private IP address. Added `TRUSTED_INTERNAL_HOSTS` environment variable that lets admins allowlist hostnames and/or CIDR ranges that should bypass SSRF private-IP blocking. Accepts comma-separated values (e.g., `myhost.local,192.168.5.0/24`). SSRF protection remains fully active for non-trusted hosts. +- Files affected: `pixelprobe/utils/security.py`, `config.py`, `version.py`, `docker-compose.yml`, `docs/CONFIGURATION.md`, `tests/test_security_fixes.py` + +--- + ## [2.5.65] - 2026-02-20 ### Fixed diff --git a/config.py b/config.py index dcf9cc9..6c9ddce 100644 --- a/config.py +++ b/config.py @@ -87,6 +87,11 @@ class Config: ENABLE_MONITORING = os.getenv('ENABLE_MONITORING', 'false').lower() == 'true' METRICS_PORT = int(os.getenv('METRICS_PORT', '9090')) + # SSRF trusted hosts -- hostnames and/or CIDR ranges that bypass private-IP blocking. + # Read directly from env by security.py (works outside Flask app context too). + # Example: "healthcheck.internal.local,192.168.5.0/24" + TRUSTED_INTERNAL_HOSTS = os.getenv('TRUSTED_INTERNAL_HOSTS', '') + # P2 Data Retention Configuration # Configurable retention periods for automated cleanup # Note: scan_output archival is DISABLED - keeps all scan_results data forever diff --git a/docker-compose.yml b/docker-compose.yml index 936075a..683de78 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -94,7 +94,10 @@ services: # Scheduled scans PERIODIC_SCAN_SCHEDULE: ${PERIODIC_SCAN_SCHEDULE:-} CLEANUP_SCHEDULE: ${CLEANUP_SCHEDULE:-} - + + # SSRF trusted hosts (hostnames/CIDRs that bypass private-IP blocking) + TRUSTED_INTERNAL_HOSTS: ${TRUSTED_INTERNAL_HOSTS:-} + volumes: # Media files to scan - ${MEDIA_PATH:-./media}:/media:ro @@ -154,6 +157,9 @@ services: PERIODIC_SCAN_SCHEDULE: ${PERIODIC_SCAN_SCHEDULE:-} CLEANUP_SCHEDULE: ${CLEANUP_SCHEDULE:-} + # SSRF trusted hosts (hostnames/CIDRs that bypass private-IP blocking) + TRUSTED_INTERNAL_HOSTS: ${TRUSTED_INTERNAL_HOSTS:-} + TZ: America/New_York volumes: diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index c0a6691..9ffa53b 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -483,6 +483,33 @@ python tools/data_retention.py --dry-run ## Security Configuration +### SSRF Trusted Hosts + +PixelProbe includes SSRF protection that blocks outbound requests to private/reserved IP ranges. If you use internal services for healthchecks, notifications (ntfy, webhooks), or similar integrations that resolve to private IPs, you can allowlist them: + +| Variable | Default | Description | +|----------|---------|-------------| +| `TRUSTED_INTERNAL_HOSTS` | (empty) | Comma-separated hostnames and/or CIDR ranges that bypass SSRF private-IP blocking | + +**Examples:** +```bash +# Single hostname +TRUSTED_INTERNAL_HOSTS=healthcheck.internal.local + +# Hostname + subnet +TRUSTED_INTERNAL_HOSTS=healthcheck.internal.local,192.168.5.0/24 + +# Multiple entries +TRUSTED_INTERNAL_HOSTS=healthcheck.internal.local,ntfy.internal.local,10.0.0.0/8 +``` + +**Notes:** +- Hostname matching is case-insensitive +- CIDR ranges apply to resolved IPs regardless of hostname +- A bare IP (e.g., `10.0.0.5`) is treated as a `/32` single-host range +- Must be set in both `pixelprobe` and `celery-worker` containers (or via shared `.env`) +- Public IPs are always allowed; this setting only affects private/reserved ranges + ### Secret Key Generation Generate a secure secret key: diff --git a/pixelprobe/utils/security.py b/pixelprobe/utils/security.py index 74b8af1..bddf0ce 100644 --- a/pixelprobe/utils/security.py +++ b/pixelprobe/utils/security.py @@ -9,7 +9,7 @@ import urllib.parse from functools import wraps from datetime import datetime, timezone -from typing import Optional, Tuple +from typing import Optional, Set, Tuple from flask import request, jsonify, current_app from werkzeug.security import safe_join from models import db, ScanConfiguration @@ -17,6 +17,74 @@ logger = logging.getLogger(__name__) + +# Trusted internal hosts cache (lazy-loaded from TRUSTED_INTERNAL_HOSTS env var) +_trusted_hostnames: Optional[Set[str]] = None +_trusted_networks: Optional[Set[ipaddress.IPv4Network | ipaddress.IPv6Network]] = None + + +def _load_trusted_hosts(): + """Parse TRUSTED_INTERNAL_HOSTS env var into hostname and network sets. + + Format: comma-separated list of hostnames and/or CIDR ranges. + Example: "healthcheck.internal.local,192.168.5.0/24,10.0.0.1" + + Results are cached on first call. Call _reset_trusted_hosts() to clear + the cache (useful for testing). + """ + global _trusted_hostnames, _trusted_networks + + if _trusted_hostnames is not None: + return + + _trusted_hostnames = set() + _trusted_networks = set() + + raw = os.environ.get('TRUSTED_INTERNAL_HOSTS', '').strip() + if not raw: + return + + for entry in raw.split(','): + entry = entry.strip() + if not entry: + continue + # Try parsing as an IP network (CIDR or bare IP) + try: + network = ipaddress.ip_network(entry, strict=False) + _trusted_networks.add(network) + logger.info(f"Trusted internal network: {network}") + except ValueError: + # Not a valid network -- treat as a hostname + _trusted_hostnames.add(entry.lower()) + logger.info(f"Trusted internal hostname: {entry.lower()}") + + +def _reset_trusted_hosts(): + """Clear the trusted hosts cache (for testing).""" + global _trusted_hostnames, _trusted_networks + _trusted_hostnames = None + _trusted_networks = None + + +def _is_trusted(hostname: str, ip_str: str) -> bool: + """Check whether a hostname or resolved IP is in the trusted allowlist.""" + _load_trusted_hosts() + + # Check hostname + if hostname and hostname.lower() in _trusted_hostnames: + return True + + # Check IP against trusted networks + try: + ip = ipaddress.ip_address(ip_str) + for network in _trusted_networks: + if ip in network: + return True + except ValueError: + pass + + return False + class SecurityError(Exception): """Base exception for security-related errors""" pass @@ -396,6 +464,13 @@ def validate_safe_url(url: str) -> Tuple[bool, Optional[str]]: for network in _BLOCKED_NETWORKS: if ip in network: + # Check trusted allowlist before blocking + if _is_trusted(hostname, ip_str): + logger.debug( + f"Allowing trusted internal request: {url} " + f"(resolved to {ip_str}, hostname={hostname})" + ) + break # Trusted -- skip remaining blocked networks for this IP AuditLogger.log_security_event( 'ssrf_blocked', f"Blocked outbound request to private IP: {url} resolved to {ip_str}", diff --git a/tests/test_security_fixes.py b/tests/test_security_fixes.py index 17c7c58..9cf0255 100644 --- a/tests/test_security_fixes.py +++ b/tests/test_security_fixes.py @@ -1,9 +1,11 @@ """ -Tests for security fixes in v2.5.64: +Tests for security fixes in v2.5.64+: - Authentication bypass via X-Internal-Request header - SSRF via healthcheck and webhook URLs +- Trusted internal hosts allowlist (v2.5.66) """ +import os import pytest from unittest.mock import patch, MagicMock import requests @@ -224,3 +226,115 @@ def test_webhook_public_url_accepted(self, mock_validate): 'webhook_url': 'https://hooks.slack.com/services/T00/B00/xxxx' }) assert error is None + + +# ==================== Trusted Internal Hosts Tests (v2.5.66) ==================== + + +class TestTrustedInternalHosts: + """Tests for TRUSTED_INTERNAL_HOSTS allowlist bypassing SSRF private-IP blocking""" + + def setup_method(self): + """Reset the trusted hosts cache before each test""" + from pixelprobe.utils.security import _reset_trusted_hosts + _reset_trusted_hosts() + + def teardown_method(self): + """Reset the cache and env var after each test""" + from pixelprobe.utils.security import _reset_trusted_hosts + _reset_trusted_hosts() + os.environ.pop('TRUSTED_INTERNAL_HOSTS', None) + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_trusted_hostname_bypasses_ssrf_block(self, mock_getaddrinfo): + """A hostname in TRUSTED_INTERNAL_HOSTS should be allowed even if it resolves to a private IP""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = 'healthcheck.internal.local' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.5.33', 80)) + ] + is_safe, error = validate_safe_url('http://healthcheck.internal.local/ping/abc') + assert is_safe, f"Expected safe but got error: {error}" + assert error is None + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_trusted_cidr_bypasses_ssrf_block(self, mock_getaddrinfo): + """An IP within a trusted CIDR range should be allowed""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = '192.168.5.0/24' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.5.33', 80)) + ] + is_safe, error = validate_safe_url('http://internal-service.local/api') + assert is_safe, f"Expected safe but got error: {error}" + assert error is None + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_trusted_bare_ip_bypasses_ssrf_block(self, mock_getaddrinfo): + """A single trusted IP (no CIDR suffix) should be treated as /32 and allowed""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = '10.0.0.5' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('10.0.0.5', 443)) + ] + is_safe, error = validate_safe_url('https://10.0.0.5/webhook') + assert is_safe, f"Expected safe but got error: {error}" + assert error is None + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_non_trusted_private_ip_still_blocked(self, mock_getaddrinfo): + """Private IPs NOT in the trusted list must still be blocked""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = 'healthcheck.internal.local' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.1.1', 80)) + ] + is_safe, error = validate_safe_url('http://evil.internal/') + assert not is_safe + assert 'private' in error.lower() or 'reserved' in error.lower() + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_empty_trusted_hosts_preserves_blocking(self, mock_getaddrinfo): + """When TRUSTED_INTERNAL_HOSTS is empty/unset, all private IPs should be blocked""" + from pixelprobe.utils.security import validate_safe_url + os.environ.pop('TRUSTED_INTERNAL_HOSTS', None) + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.5.33', 80)) + ] + is_safe, error = validate_safe_url('http://healthcheck.internal.local/ping/abc') + assert not is_safe + assert 'private' in error.lower() or 'reserved' in error.lower() + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_trusted_hostname_case_insensitive(self, mock_getaddrinfo): + """Hostname matching should be case-insensitive""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = 'HealthCheck.Internal.Local' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.5.33', 80)) + ] + is_safe, error = validate_safe_url('http://healthcheck.internal.local/ping/abc') + assert is_safe, f"Expected safe but got error: {error}" + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_multiple_trusted_entries(self, mock_getaddrinfo): + """Multiple comma-separated entries should all be recognized""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = 'healthcheck.internal.local, ntfy.internal, 10.0.0.0/8' + # Test hostname match + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.5.33', 80)) + ] + is_safe, _ = validate_safe_url('http://healthcheck.internal.local/ping') + assert is_safe + + @patch('pixelprobe.utils.security.socket.getaddrinfo') + def test_ip_outside_trusted_cidr_still_blocked(self, mock_getaddrinfo): + """An IP outside the trusted CIDR range must still be blocked""" + from pixelprobe.utils.security import validate_safe_url + os.environ['TRUSTED_INTERNAL_HOSTS'] = '192.168.5.0/24' + mock_getaddrinfo.return_value = [ + (2, 1, 6, '', ('192.168.6.1', 80)) + ] + is_safe, error = validate_safe_url('http://other.internal/') + assert not is_safe diff --git a/version.py b/version.py index e85a96e..049f58c 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,7 @@ # Default version - this is the single source of truth -_DEFAULT_VERSION = '2.5.65' +_DEFAULT_VERSION = '2.5.66' # Allow override via environment variable for CI/CD, but default to the hardcoded version From e03ee2be378ce1f84be51f68bf4f5ed750ede194 Mon Sep 17 00:00:00 2001 From: ttlequals0 Date: Mon, 23 Feb 2026 18:12:12 -0500 Subject: [PATCH 2/2] Fix Python 3.9 compatibility in security.py type annotation Use Union[X, Y] instead of X | Y (PEP 604 requires Python 3.10+). --- pixelprobe/utils/security.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pixelprobe/utils/security.py b/pixelprobe/utils/security.py index bddf0ce..e2c4db1 100644 --- a/pixelprobe/utils/security.py +++ b/pixelprobe/utils/security.py @@ -9,7 +9,7 @@ import urllib.parse from functools import wraps from datetime import datetime, timezone -from typing import Optional, Set, Tuple +from typing import Optional, Set, Tuple, Union from flask import request, jsonify, current_app from werkzeug.security import safe_join from models import db, ScanConfiguration @@ -20,7 +20,7 @@ # Trusted internal hosts cache (lazy-loaded from TRUSTED_INTERNAL_HOSTS env var) _trusted_hostnames: Optional[Set[str]] = None -_trusted_networks: Optional[Set[ipaddress.IPv4Network | ipaddress.IPv6Network]] = None +_trusted_networks: Optional[Set[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]] = None def _load_trusted_hosts():