Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 2025-02-18 - [Preventing SSRF via DNS Rebinding with Post-Connect Verification]

Check notice

Code scanning / Remark-lint (reported by Codacy)

Warn when references to undefined definitions are found. Note

[no-undefined-references] Found reference to undefined definition

Check notice

Code scanning / Remark-lint (reported by Codacy)

Warn when shortcut reference links are used. Note

[no-shortcut-reference-link] Use the trailing [] on reference links
**Vulnerability:** Validation of URLs checks the resolved IP, but `httpx` re-resolves the domain during connection, allowing a TOCTOU (Time-of-Check Time-of-Use) attack where the IP changes to a private one (DNS Rebinding).
**Learning:** Standard URL validation is insufficient against sophisticated attackers controlling DNS. Checking the IP *after* the connection is established (using `stream.get_extra_info("server_addr")`) is a robust defense because it verifies the actual endpoint used.
**Prevention:** Always verify the peer/server address after connection establishment when making requests to untrusted or user-controlled URLs, especially when preventing access to internal resources.
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.13
3.13
19 changes: 19 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@
# res is (family, type, proto, canonname, sockaddr)
# sockaddr is (address, port) for AF_INET/AF_INET6
ip_str = res[4][0]
ip = ipaddress.ip_address(ip_str)

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "ip" doesn't conform to snake_case naming style Warning

Variable name "ip" doesn't conform to snake_case naming style
if not ip.is_global or ip.is_multicast:
log.warning(
f"Skipping unsafe URL (domain {hostname} resolves to non-global/multicast IP {ip}): {sanitize_for_log(url)}"
Expand Down Expand Up @@ -526,17 +526,36 @@
time.sleep(wait_time)


def _gh_get(url: str) -> Dict:
# First check: Quick check without holding lock for long
with _cache_lock:
if url in _cache:
return _cache[url]

# Fetch data if not cached
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
with _gh.stream("GET", url) as r:
r.raise_for_status()

# SECURITY FIX: Post-connect SSRF verification (DNS Rebinding protection)
# Verify the actual IP address we connected to, to prevent DNS rebinding attacks.
stream = r.extensions.get("network_stream")
if stream:
# "server_addr" returns (ip, port) for TCP connections
server_addr = stream.get_extra_info("server_addr")
if server_addr and len(server_addr) >= 1:
ip_str = server_addr[0]
try:
ip = ipaddress.ip_address(ip_str)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "ip" doesn't conform to snake_case naming style Warning

Variable name "ip" doesn't conform to snake_case naming style
except ValueError:
# server_addr[0] might not be an IP string (unlikely for TCP)
ip = None

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "ip" doesn't conform to snake_case naming style Warning

Variable name "ip" doesn't conform to snake_case naming style

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "ip" doesn't conform to snake_case naming style Warning

Variable name "ip" doesn't conform to snake_case naming style

if ip and (not ip.is_global or ip.is_multicast):
raise ValueError(
f"Security Alert: Domain resolved to private IP {ip_str} (DNS Rebinding protection)"

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)
)
Comment on lines +548 to +557

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This try...except block currently fails open. If ipaddress.ip_address(ip_str) raises a ValueError (e.g., if ip_str is not a valid IP address), ip is set to None. The subsequent check if ip and ... will then be false, and the request processing will continue, bypassing the security validation.

For security-critical checks, it's crucial to fail closed. If the IP address cannot be parsed, we should treat it as a potential threat and abort the request. I suggest modifying this to raise an exception if parsing fails.

Suggested change
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
# server_addr[0] might not be an IP string (unlikely for TCP)
ip = None
if ip and (not ip.is_global or ip.is_multicast):
raise ValueError(
f"Security Alert: Domain resolved to private IP {ip_str} (DNS Rebinding protection)"
)
try:
ip = ipaddress.ip_address(ip_str)
except ValueError as e:
# Fail closed: if we can't parse the IP, we can't trust it.
raise ValueError(f"Security Alert: Could not parse server IP '{ip_str}' for validation.") from e
if not ip.is_global or ip.is_multicast:
raise ValueError(
f"Security Alert: Domain resolved to private IP {ip_str} (DNS Rebinding protection)"
)

Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding logging when the post-connect SSRF verification cannot be performed (e.g., when network_stream is not available in extensions). While the initial URL validation provides protection, logging would help detect if the defense-in-depth layer is being bypassed due to httpx implementation changes or unexpected conditions. For example, add a log.debug or log.warning when stream is None.

Suggested change
)
)
else:
# If httpx does not expose a network_stream, we cannot perform the
# post-connect SSRF verification; log this for observability.
log.warning(
"Post-connect SSRF verification skipped for %s: network_stream extension missing",
sanitize_for_log(url),
)

Copilot uses AI. Check for mistakes.

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
Expand Down
70 changes: 70 additions & 0 deletions tests/test_ssrf_fix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest

Check warning

Code scanning / Prospector (reported by Codacy)

Unable to import 'pytest' (import-error) Warning test

Unable to import 'pytest' (import-error)

Check warning

Code scanning / Pylint (reported by Codacy)

Missing module docstring Warning test

Missing module docstring

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Missing module docstring Warning test

Missing module docstring
import httpx

Check warning

Code scanning / Prospector (reported by Codacy)

Unable to import 'httpx' (import-error) Warning test

Unable to import 'httpx' (import-error)
from unittest.mock import MagicMock

Check warning

Code scanning / Pylint (reported by Codacy)

standard import "from unittest.mock import MagicMock" should be placed before "import pytest" Warning test

standard import "from unittest.mock import MagicMock" should be placed before "import pytest"

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

standard import "from unittest.mock import MagicMock" should be placed before "import pytest" Warning test

standard import "from unittest.mock import MagicMock" should be placed before "import pytest"
import main

@pytest.fixture(autouse=True)
def clear_cache():

Check warning

Code scanning / Pylint (reported by Codacy)

Missing function docstring Warning test

Missing function docstring

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Missing function or method docstring Warning test

Missing function or method docstring
main._cache.clear()

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _cache of a client class (protected-access) Warning test

Access to a protected member _cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fixture should clear the cache after each test as well to ensure proper test isolation. Change the fixture to use yield:

@pytest.fixture(autouse=True)
def clear_cache():
    main._cache.clear()
    yield
    main._cache.clear()

This pattern is consistent with the setUp/tearDown pattern used in other test files like test_cache_optimization.py.

Suggested change
main._cache.clear()
main._cache.clear()
yield
main._cache.clear()

Copilot uses AI. Check for mistakes.

def test_gh_get_blocks_private_ip_after_connect(monkeypatch):
"""
Test that _gh_get raises ValueError if the connection was established to a private IP.
This simulates a DNS Rebinding attack where the initial check passes but the connection goes to private IP.

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (111/100) Warning test

Line too long (111/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (111/100) Warning test

Line too long (111/100)
"""

# Mock response stream with private IP
mock_stream = MagicMock()
mock_stream.get_extra_info.return_value = ('127.0.0.1', 443)

mock_response = MagicMock(spec=httpx.Response)
mock_response.extensions = {"network_stream": mock_stream}
mock_response.headers = {}
mock_response.iter_bytes.return_value = [b'{}']
mock_response.raise_for_status.return_value = None

# Context manager mock for stream()
mock_context = MagicMock()
mock_context.__enter__.return_value = mock_response
mock_context.__exit__.return_value = None

# Mock _gh.stream
mock_gh = MagicMock()
mock_gh.stream.return_value = mock_context

monkeypatch.setattr(main, "_gh", mock_gh)

# We expect ValueError because of our security fix
# Before the fix, this test will FAIL (it won't raise)
with pytest.raises(ValueError, match="Security Alert: Domain resolved to private IP"):
main._gh_get("https://example.com/config.json")

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _gh_get of a client class (protected-access) Warning test

Access to a protected member _gh_get of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _gh_get of a client class Note test

Access to a protected member _gh_get of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _gh_get of a client class Note test

Access to a protected member _gh_get of a client class

def test_gh_get_allows_public_ip_after_connect(monkeypatch):
"""
Test that _gh_get allows connection if established to a public IP.
"""

# Mock response stream with public IP
mock_stream = MagicMock()
mock_stream.get_extra_info.return_value = ('8.8.8.8', 443)

mock_response = MagicMock(spec=httpx.Response)
mock_response.extensions = {"network_stream": mock_stream}
mock_response.headers = {}
mock_response.iter_bytes.return_value = [b'{"valid": "json"}']
mock_response.raise_for_status.return_value = None

# Context manager mock
mock_context = MagicMock()
mock_context.__enter__.return_value = mock_response
mock_context.__exit__.return_value = None

# Mock _gh.stream
mock_gh = MagicMock()
mock_gh.stream.return_value = mock_context

monkeypatch.setattr(main, "_gh", mock_gh)

# Should not raise
result = main._gh_get("https://example.com/config.json")

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _gh_get of a client class (protected-access) Warning test

Access to a protected member _gh_get of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _gh_get of a client class Note test

Access to a protected member _gh_get of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _gh_get of a client class Note test

Access to a protected member _gh_get of a client class
assert result == {"valid": "json"}

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To complement the proposed change in main.py and ensure the security check is robust, it would be beneficial to add a test case that verifies the system fails closed when the connected address is not a valid IP string. This ensures that unexpected return values from get_extra_info are handled securely.

    assert result == {"valid": "json"}

def test_gh_get_blocks_invalid_ip_after_connect(monkeypatch):
    """
    Test that _gh_get raises ValueError if the connection was established to an invalid IP.
    This ensures we fail-closed if the server address is not a parsable IP.
    """

    # Mock response stream with an invalid IP string
    mock_stream = MagicMock()
    mock_stream.get_extra_info.return_value = ('not-a-valid-ip', 443)

    mock_response = MagicMock(spec=httpx.Response)
    mock_response.extensions = {"network_stream": mock_stream}
    mock_response.headers = {}
    mock_response.iter_bytes.return_value = [b'{}']
    mock_response.raise_for_status.return_value = None

    # Context manager mock for stream()
    mock_context = MagicMock()
    mock_context.__enter__.return_value = mock_response
    mock_context.__exit__.return_value = None

    # Mock _gh.stream
    mock_gh = MagicMock()
    mock_gh.stream.return_value = mock_context

    monkeypatch.setattr(main, "_gh", mock_gh)

    # We expect ValueError because the IP is invalid and we should fail closed.
    with pytest.raises(ValueError, match="Security Alert: Could not parse server IP"):
        main._gh_get("https://example.com/config.json")

Check notice

Code scanning / Bandit (reported by Codacy)

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
Comment on lines +1 to +70
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding additional test cases to improve coverage of the SSRF fix:

  1. IPv6 loopback address (::1)
  2. IPv6 private address (e.g., fc00::1)
  3. Multicast IP address (e.g., 224.0.0.1)
  4. Case where network_stream is not available in extensions
  5. Case where server_addr is None or empty

These additional tests would help ensure the security fix handles all edge cases correctly.

Copilot uses AI. Check for mistakes.
12 changes: 11 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading