Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import concurrent.futures
import threading
import ipaddress
import socket
from urllib.parse import urlparse
from typing import Dict, List, Optional, Any, Set, Sequence

Expand Down Expand Up @@ -209,8 +210,21 @@
log.warning(f"Skipping unsafe URL (private IP): {sanitize_for_log(url)}")
return False
except ValueError:
# Not an IP literal, it's a domain.
pass
# Not an IP literal, it's a domain. Resolve it to prevent SSRF against internal services.

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (101/100) Warning

Line too long (101/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (101/100) Warning

Line too long (101/100)
try:
# Use getaddrinfo to support both IPv4 and IPv6 and check all resolved addresses

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "e" doesn't conform to snake_case naming style Warning

Variable name "e" doesn't conform to snake_case naming style

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Catching too general exception Exception Note

Catching too general exception Exception
addr_info = socket.getaddrinfo(hostname, None)
for res in addr_info:
# res is (family, type, proto, canonname, sockaddr)
# sockaddr is (address, port) for IPv4 and (address, port, flow info, scope id) for IPv6

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)
ip_str = res[4][0]
ip_obj = ipaddress.ip_address(ip_str)
if ip_obj.is_private or ip_obj.is_loopback:
log.warning(f"Skipping unsafe URL (domain resolves to private IP {ip_str}): {sanitize_for_log(url)}")

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (125/100) Warning

Line too long (125/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (125/100) Warning

Line too long (125/100)

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
return False
except Exception as e:
log.warning(f"Skipping unsafe URL (DNS resolution failed): {sanitize_for_log(url)} ({e})")

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (106/100) Warning

Line too long (106/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (106/100) Warning

Line too long (106/100)

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
return False

except Exception as e:
log.warning(f"Failed to validate URL {sanitize_for_log(url)}: {e}")
Expand Down Expand Up @@ -671,7 +685,7 @@
client # Pass the persistent client
): folder_data
for folder_data in folder_data_list
}

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "e" doesn't conform to snake_case naming style Warning

Variable name "e" doesn't conform to snake_case naming style

Check notice

Code scanning / Pylint (reported by Codacy)

Catching too general exception Exception Note

Catching too general exception Exception

for future in concurrent.futures.as_completed(future_to_folder):
folder_data = future_to_folder[future]
Expand Down
70 changes: 70 additions & 0 deletions tests/test_ssrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import unittest

Check warning

Code scanning / Pylint (reported by Codacy)

Missing module docstring Warning test

Missing module docstring

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Missing module docstring Warning test

Missing module docstring
from unittest.mock import patch
import sys
import os
import socket

# Add parent directory to path to import main
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from main import validate_folder_url

Check warning

Code scanning / Prospector (reported by Codacy)

Import "from main import validate_folder_url" should be placed at the top of the module (wrong-import-position) Warning test

Import "from main import validate_folder_url" should be placed at the top of the module (wrong-import-position)

Check warning

Code scanning / Pylint (reported by Codacy)

Import "from main import validate_folder_url" should be placed at the top of the module Warning test

Import "from main import validate_folder_url" should be placed at the top of the module

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Import "from main import validate_folder_url" should be placed at the top of the module Warning test

Import "from main import validate_folder_url" should be placed at the top of the module

class TestSSRF(unittest.TestCase):

Check warning

Code scanning / Pylint (reported by Codacy)

Missing class docstring Warning test

Missing class docstring

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Missing class docstring Warning test

Missing class docstring
def test_localhost_literal(self):
"""Test that explicit localhost strings are rejected."""
self.assertFalse(validate_folder_url("https://localhost/config.json"))
self.assertFalse(validate_folder_url("https://127.0.0.1/config.json"))
self.assertFalse(validate_folder_url("https://[::1]/config.json"))

@patch('socket.getaddrinfo')
def test_private_ipv4_resolution(self, mock_getaddrinfo):
"""Test that domains resolving to private IPv4 are rejected."""
# mock returns list of (family, type, proto, canonname, sockaddr)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 0))
]
url = "https://internal.private/config.json"

self.assertFalse(validate_folder_url(url), "Should reject domain resolving to private IPv4")

@patch('socket.getaddrinfo')
def test_private_ipv6_resolution(self, mock_getaddrinfo):
"""Test that domains resolving to private IPv6 are rejected."""
mock_getaddrinfo.return_value = [
(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fd00::1', 0, 0, 0))
]
url = "https://internal6.private/config.json"

self.assertFalse(validate_folder_url(url), "Should reject domain resolving to private IPv6")

@patch('socket.getaddrinfo')
def test_mixed_resolution_unsafe(self, mock_getaddrinfo):
"""Test that if ANY resolved IP is private, it is rejected."""
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 0)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 0))
]
url = "https://mixed.private/config.json"

self.assertFalse(validate_folder_url(url), "Should reject if any IP is private")

@patch('socket.getaddrinfo')
def test_public_resolution(self, mock_getaddrinfo):
"""Test that domains resolving to only public IPs are accepted."""
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 0))
]
url = "https://google.com/config.json"

self.assertTrue(validate_folder_url(url), "Should accept domain resolving to public IP")

@patch('socket.getaddrinfo')
def test_dns_resolution_failure(self, mock_getaddrinfo):
"""Test that domains failing resolution are rejected."""
mock_getaddrinfo.side_effect = Exception("DNS lookup failed")
url = "https://nonexistent.domain/config.json"

self.assertFalse(validate_folder_url(url), "Should reject domain that fails resolution")

if __name__ == '__main__':

Check warning

Code scanning / Prospector (reported by Codacy)

expected 2 blank lines after class or function definition, found 1 (E305) Warning test

expected 2 blank lines after class or function definition, found 1 (E305)
unittest.main()
Loading