Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0).

## [2.5.65] - 2026-02-20

### Fixed

- **Share INTERNAL_API_SECRET via Redis across gunicorn workers**: The auto-generated secret was unique per gunicorn worker, causing scheduler internal requests to fail with 401 when routed to a different worker. Now uses Redis `SETNX` to generate the secret once and share it across all workers and the celery container.
- Files affected: `app.py`, `version.py`

---

## [2.5.64] - 2026-02-19

### Security

- **Fix authentication bypass via X-Internal-Request header**: Replace trivially spoofable static string check (`X-Internal-Request: scheduler`) with cryptographic HMAC validation using a shared secret (`X-Internal-Secret`). Secret is auto-generated at startup if not set via `INTERNAL_API_SECRET` environment variable.
- **Fix SSRF in healthcheck and notification services**: Add `validate_safe_url()` function that resolves hostnames and blocks requests to private/reserved IP ranges (RFC 1918, link-local, loopback, cloud metadata 169.254.x.x). Applied to healthcheck pings, webhook notifications, and ntfy notifications.
- **Add redirect-safe HTTP session**: New `create_safe_session()` creates a `requests.Session` with a response hook that validates redirect `Location` headers against the same private IP blocklist, preventing SSRF via DNS rebinding or redirect chains.
- **Fix ntfy config field name mismatch**: `_validate_provider_config()` now accepts both `server_url` (canonical, matching notification_service.py) and `server` (legacy) field names for backward compatibility.
- Files affected: `auth.py`, `config.py`, `app.py`, `scheduler.py`, `pixelprobe/utils/security.py`, `pixelprobe/services/healthcheck_service.py`, `pixelprobe/services/notification_service.py`, `pixelprobe/api/notification_routes.py`, `version.py`

---

## [2.5.63] - 2026-02-18

### Code Simplification
Expand Down
36 changes: 36 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,42 @@
config_class = get_config(config_name)
config_class.init_app(app)

# Auto-generate INTERNAL_API_SECRET if not set via environment
# This secret is used for scheduler-to-app internal authentication
# In multi-worker/multi-container setups, the secret is shared via Redis
# so all gunicorn workers and the celery worker use the same value.
if not app.config.get('INTERNAL_API_SECRET'):
import secrets
_REDIS_SECRET_KEY = 'pixelprobe:internal_api_secret'
_secret_loaded = False
try:
from pixelprobe.progress_utils import get_redis_client
_redis = get_redis_client()
if _redis:
_existing = _redis.get(_REDIS_SECRET_KEY)
if _existing:
app.config['INTERNAL_API_SECRET'] = _existing.decode('utf-8') if isinstance(_existing, bytes) else _existing
_secret_loaded = True
logger.info("Loaded INTERNAL_API_SECRET from Redis (shared across workers)")
else:
_new_secret = secrets.token_urlsafe(32)
# Use SETNX to avoid race conditions between workers
if _redis.set(_REDIS_SECRET_KEY, _new_secret, nx=True):
app.config['INTERNAL_API_SECRET'] = _new_secret
logger.info("Generated and stored INTERNAL_API_SECRET in Redis")
else:
# Another worker beat us to it, read theirs
_existing = _redis.get(_REDIS_SECRET_KEY)
app.config['INTERNAL_API_SECRET'] = _existing.decode('utf-8') if isinstance(_existing, bytes) else _existing
logger.info("Loaded INTERNAL_API_SECRET from Redis (set by sibling worker)")
_secret_loaded = True
except Exception as e:
logger.warning(f"Could not use Redis for INTERNAL_API_SECRET: {e}")

if not _secret_loaded:
app.config['INTERNAL_API_SECRET'] = secrets.token_urlsafe(32)
logger.info("Auto-generated INTERNAL_API_SECRET (Redis unavailable, single-worker mode)")

# Backward compatibility - keep old environment variable support
if not app.config.get('SECRET_KEY'):
SECRET_KEY = os.environ.get('SECRET_KEY')
Expand Down
15 changes: 10 additions & 5 deletions auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
Handles user authentication, session management, and API token validation
"""

import hmac
import logging
from functools import wraps
from datetime import datetime, timezone

from flask import jsonify, request, redirect, url_for, session
from flask import jsonify, request, redirect, url_for, session, current_app
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_restx import abort as restx_abort
from models import User, APIToken, db
Expand Down Expand Up @@ -113,8 +114,10 @@ def auth_required(f):
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Allow internal scheduler requests without authentication
if request.headers.get('X-Internal-Request') == 'scheduler':
# Allow internal scheduler requests authenticated by shared secret
internal_secret = request.headers.get('X-Internal-Secret', '')
expected_secret = current_app.config.get('INTERNAL_API_SECRET', '')
if internal_secret and expected_secret and hmac.compare_digest(internal_secret, expected_secret):
return f(*args, **kwargs)

# Check if user is authenticated via session
Expand Down Expand Up @@ -142,8 +145,10 @@ def check_auth():
Returns True if authenticated, False otherwise.
Use this inside Flask-RESTX Resource methods instead of the decorator.
"""
# Allow internal scheduler requests without authentication
if request.headers.get('X-Internal-Request') == 'scheduler':
# Allow internal scheduler requests authenticated by shared secret
internal_secret = request.headers.get('X-Internal-Secret', '')
expected_secret = current_app.config.get('INTERNAL_API_SECRET', '')
if internal_secret and expected_secret and hmac.compare_digest(internal_secret, expected_secret):
return True

# Check if user is authenticated via session
Expand Down
4 changes: 4 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class Config:
SECRET_KEY = os.environ.get('SECRET_KEY')
if not SECRET_KEY:
raise ValueError("SECRET_KEY environment variable must be set")

# Internal API secret for scheduler-to-app authentication
# If not set, app.py will auto-generate one at startup
INTERNAL_API_SECRET = os.environ.get('INTERNAL_API_SECRET', '')

# PostgreSQL configuration
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
Expand Down
13 changes: 12 additions & 1 deletion pixelprobe/api/notification_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from models import db, NotificationProvider, NotificationRule
from auth import auth_required
from pixelprobe.services.notification_service import NotificationService
from pixelprobe.utils.security import validate_safe_url

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -446,11 +447,21 @@ def _validate_provider_config(provider_type: str, config: dict) -> Optional[str]
elif provider_type == 'ntfy':
if not config.get('topic'):
return 'ntfy topic is required'
if not config.get('server'):
# Support both 'server_url' (canonical) and 'server' (legacy) field names
server_url = config.get('server_url') or config.get('server')
if not server_url:
return 'ntfy server URL is required'
# SSRF protection: validate server URL
is_safe, error = validate_safe_url(f"{server_url}/{config.get('topic')}")
if not is_safe:
return f'ntfy server URL blocked by security policy: {error}'

elif provider_type == 'webhook':
if not config.get('webhook_url'):
return 'Webhook URL is required'
# SSRF protection: validate webhook URL
is_safe, error = validate_safe_url(config['webhook_url'])
if not is_safe:
return f'Webhook URL blocked by security policy: {error}'

return None
26 changes: 25 additions & 1 deletion pixelprobe/services/healthcheck_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from typing import Optional, Dict
from datetime import datetime
from pixelprobe.utils.security import validate_safe_url, create_safe_session

logger = logging.getLogger(__name__)

Expand All @@ -16,7 +17,7 @@ class HealthcheckService:
"""Service for managing healthchecks.io integrations"""

def __init__(self):
self.session = requests.Session()
self.session = create_safe_session()
self.session.headers.update({
'User-Agent': 'PixelProbe-Healthcheck/1.0'
})
Expand All @@ -35,6 +36,12 @@ def ping_start(self, healthcheck_url: str) -> bool:
logger.warning("No healthcheck URL provided")
return False

# SSRF protection: validate URL before making request
is_safe, error = validate_safe_url(healthcheck_url)
if not is_safe:
logger.warning(f"Healthcheck start ping blocked (SSRF): {error}")
return False

# Healthchecks.io uses /start slug for start signals
start_url = f"{healthcheck_url.rstrip('/')}/start"
logger.info(f"Sending healthcheck start ping to: {start_url}")
Expand Down Expand Up @@ -70,6 +77,12 @@ def ping_success(self, healthcheck_url: str, report_data: Optional[Dict] = None)
logger.warning("No healthcheck URL provided")
return False

# SSRF protection: validate URL before making request
is_safe, error = validate_safe_url(healthcheck_url)
if not is_safe:
logger.warning(f"Healthcheck success ping blocked (SSRF): {error}")
return False

logger.info(f"Sending healthcheck success ping to: {healthcheck_url}")

# Format report data if provided
Expand Down Expand Up @@ -135,6 +148,12 @@ def ping_fail(self, healthcheck_url: str, error_message: Optional[str] = None) -
logger.warning("No healthcheck URL provided")
return False

# SSRF protection: validate URL before making request
is_safe, error = validate_safe_url(healthcheck_url)
if not is_safe:
logger.warning(f"Healthcheck failure ping blocked (SSRF): {error}")
return False

# Healthchecks.io uses /fail slug for failure signals
fail_url = f"{healthcheck_url.rstrip('/')}/fail"
logger.info(f"Sending healthcheck failure ping to: {fail_url}")
Expand Down Expand Up @@ -230,6 +249,11 @@ def validate_url(self, healthcheck_url: str) -> tuple[bool, Optional[str]]:
if not healthcheck_url.startswith(('http://', 'https://')):
return False, "Healthcheck URL must start with http:// or https://"

# SSRF protection: validate against private IP ranges
is_safe, ssrf_error = validate_safe_url(healthcheck_url)
if not is_safe:
return False, f"URL blocked by security policy: {ssrf_error}"

# Log the URL format for debugging
# Supports both public hc-ping.com and self-hosted instances
# Public: https://hc-ping.com/UUID
Expand Down
14 changes: 13 additions & 1 deletion pixelprobe/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests
from typing import Dict, Optional, List
from datetime import datetime, timezone
from pixelprobe.utils.security import validate_safe_url, create_safe_session

logger = logging.getLogger(__name__)

Expand All @@ -19,7 +20,7 @@ class NotificationService:
"""Service for sending notifications via various providers"""

def __init__(self):
self.session = requests.Session()
self.session = create_safe_session()
self.session.headers.update({'User-Agent': 'PixelProbe-Notification/1.0'})
self.timeout = 10

Expand Down Expand Up @@ -171,6 +172,12 @@ def _send_ntfy(
if token:
headers['Authorization'] = f'Bearer {token}'

# SSRF protection: validate URL before making request
ntfy_url = f"{server_url}/{topic}"
is_safe, error = validate_safe_url(ntfy_url)
if not is_safe:
return False, f"ntfy URL blocked by security policy: {error}"

try:
response = self.session.post(
f"{server_url}/{topic}",
Expand Down Expand Up @@ -288,6 +295,11 @@ def _send_webhook(
headers = {'Content-Type': 'application/json'}
headers.update(custom_headers)

# SSRF protection: validate URL before making request
is_safe, error = validate_safe_url(webhook_url)
if not is_safe:
return False, f"Webhook URL blocked by security policy: {error}"

try:
if method == 'POST':
response = self.session.post(
Expand Down
106 changes: 105 additions & 1 deletion pixelprobe/utils/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
"""
import os
import re
import socket
import ipaddress
import logging
import urllib.parse
from functools import wraps
from datetime import datetime, timezone
from typing import Optional, Tuple
from flask import request, jsonify, current_app
from werkzeug.security import safe_join
from models import db, ScanConfiguration
import requests

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -328,4 +333,103 @@ def decorated_function(*args, **kwargs):

return f(*args, **kwargs)
return decorated_function
return decorator
return decorator


# SSRF protection

# Private/reserved IP networks that should never be targeted by outbound requests
_BLOCKED_NETWORKS = [
ipaddress.ip_network('127.0.0.0/8'), # Loopback
ipaddress.ip_network('10.0.0.0/8'), # RFC 1918
ipaddress.ip_network('172.16.0.0/12'), # RFC 1918
ipaddress.ip_network('192.168.0.0/16'), # RFC 1918
ipaddress.ip_network('169.254.0.0/16'), # Link-local / cloud metadata
ipaddress.ip_network('0.0.0.0/8'), # "This" network
ipaddress.ip_network('::1/128'), # IPv6 loopback
ipaddress.ip_network('fc00::/7'), # IPv6 unique local
ipaddress.ip_network('fe80::/10'), # IPv6 link-local
]


def validate_safe_url(url: str) -> Tuple[bool, Optional[str]]:
"""Validate that a URL does not target private/internal IP ranges (SSRF protection).

Args:
url: The URL to validate

Returns:
Tuple of (is_safe, error_message). is_safe is True if URL is safe to request.
"""
if not url or not url.strip():
return False, "URL is empty"

try:
parsed = urllib.parse.urlparse(url)
except Exception:
return False, "Invalid URL format"

# Validate scheme
if parsed.scheme not in ('http', 'https'):
return False, f"URL scheme must be http or https, got: {parsed.scheme or 'none'}"

# Reject embedded credentials (user:pass@host)
if parsed.username or parsed.password:
return False, "URLs with embedded credentials are not allowed"

hostname = parsed.hostname
if not hostname:
return False, "URL has no hostname"

# Resolve hostname and check all resolved IPs
try:
addr_infos = socket.getaddrinfo(hostname, parsed.port or (443 if parsed.scheme == 'https' else 80))
except socket.gaierror:
return False, f"Could not resolve hostname: {hostname}"

for addr_info in addr_infos:
ip_str = addr_info[4][0]
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
continue

for network in _BLOCKED_NETWORKS:
if ip in network:
AuditLogger.log_security_event(
'ssrf_blocked',
f"Blocked outbound request to private IP: {url} resolved to {ip_str}",
severity='warning'
)
return False, f"URL resolves to a private/reserved IP address ({ip_str})"

return True, None


def create_safe_session(max_redirects: int = 5) -> requests.Session:
"""Create a requests.Session that validates redirect targets against private IP ranges.

Args:
max_redirects: Maximum number of redirects to follow

Returns:
A requests.Session configured with SSRF-safe redirect handling
"""
session = requests.Session()
session.max_redirects = max_redirects

def _check_redirect(response, *args, **kwargs):
"""Response hook that validates redirect Location headers."""
if response.is_redirect or response.is_permanent_redirect:
location = response.headers.get('Location')
if location:
# Resolve relative redirects against the request URL
redirect_url = urllib.parse.urljoin(response.url, location)
is_safe, error = validate_safe_url(redirect_url)
if not is_safe:
raise requests.ConnectionError(
f"Redirect blocked by SSRF protection: {error}"
)

session.hooks['response'].append(_check_redirect)
return session
Loading