diff --git a/subwiz/main.py b/subwiz/main.py index 0910589..954c154 100644 --- a/subwiz/main.py +++ b/subwiz/main.py @@ -233,7 +233,7 @@ def _counting_progress_dot(): return {str(dom) for dom in predictions} predictions_that_resolve = asyncio.run( - get_registered_domains(predictions, resolution_concurrency) + get_registered_domains(predictions, resolution_concurrency, apex_domain=apex) ) if not quiet: diff --git a/subwiz/resolve.py b/subwiz/resolve.py index 10beada..ff0fd53 100644 --- a/subwiz/resolve.py +++ b/subwiz/resolve.py @@ -3,9 +3,16 @@ This module provides asynchronous DNS resolution functionality to check whether domains are registered and resolve to IP addresses. It uses multiple nameservers for reliability and implements concurrency control for efficient batch processing. +Includes wildcard DNS detection to filter false positives on domains with +catch-all DNS records. """ +from __future__ import annotations + import asyncio +import random +import string +from typing import Optional import aiodns import idna.core @@ -16,28 +23,116 @@ NAME_SERVERS = ["1.1.1.1", "1.0.0.1", "8.8.8.8"] TIMEOUT = 3 TRIES = 1 +WILDCARD_PROBE_COUNT = 3 + + +async def _resolve_ips( + hostname: str, resolver: aiodns.DNSResolver +) -> frozenset[str]: + """Resolve a hostname to its set of IP addresses. + + Args: + hostname: The hostname to resolve + resolver: DNS resolver instance + + Returns: + Frozenset of IP address strings, or empty frozenset on failure + """ + try: + results = await resolver.query(hostname, "A") + return frozenset(r.host for r in results) + except (aiodns.error.DNSError, idna.IDNAError): + return frozenset() + + +async def detect_wildcard( + apex_domain: str, resolver: aiodns.DNSResolver +) -> Optional[frozenset[str]]: + """Detect whether an apex domain has a wildcard DNS record. + + Probes multiple random subdomains that are extremely unlikely to exist. + If all probes resolve to the same set of IPs, a wildcard is present. + + Args: + apex_domain: The apex domain to check (e.g. "example.com") + resolver: DNS resolver instance + + Returns: + Frozenset of wildcard IP addresses if wildcard detected, None otherwise + """ + random_labels = [ + "".join(random.choices(string.ascii_lowercase + string.digits, k=16)) + for _ in range(WILDCARD_PROBE_COUNT) + ] + probe_hosts = [f"{label}.{apex_domain}" for label in random_labels] + + ip_sets = await asyncio.gather( + *[_resolve_ips(host, resolver) for host in probe_hosts] + ) + + # If any probe failed to resolve, there is no wildcard + if any(len(ips) == 0 for ips in ip_sets): + return None + + # All probes resolved — check if they share at least one common IP + common_ips = ip_sets[0] + for ips in ip_sets[1:]: + common_ips = common_ips & ips + + if common_ips: + return common_ips + + return None async def get_registered_domains( - domains_to_check: set[Domain], resolution_concurrency: int + domains_to_check: set[Domain], + resolution_concurrency: int, + apex_domain: Optional[str] = None, ) -> set[Domain]: """Check which domains from a set are registered and resolve to IP addresses. + When an apex_domain is provided, performs wildcard detection first and filters + out domains that resolve solely to wildcard IP addresses. + Args: domains_to_check: Set of Domain objects to check for registration resolution_concurrency: Maximum number of concurrent DNS resolutions + apex_domain: Optional apex domain for wildcard detection Returns: - Set of Domain objects that are registered and resolve successfully + Set of Domain objects that are registered and resolve successfully, + excluding wildcard false positives """ - semaphore = asyncio.Semaphore(resolution_concurrency) resolver = aiodns.DNSResolver( nameservers=NAME_SERVERS, timeout=TIMEOUT, tries=TRIES ) + # Detect wildcard DNS before resolving predictions + wildcard_ips = None + if apex_domain: + wildcard_ips = await detect_wildcard(apex_domain, resolver) + + if wildcard_ips is None: + # No wildcard — use the original fast path (just check if registered) + domains_list = list(domains_to_check) + tasks = [dom.is_registered(resolver, semaphore) for dom in domains_to_check] + results = await asyncio.gather(*tasks) + return {dom for dom, is_reg in zip(domains_list, results) if is_reg} + + # Wildcard detected — resolve each domain and keep only those with + # at least one IP outside the wildcard set + async def _resolves_non_wildcard(domain: Domain) -> bool: + async with semaphore: + ips = await _resolve_ips(str(domain), resolver) + if not ips: + return False + # Keep the domain if it has any IP not in the wildcard set + return not ips.issubset(wildcard_ips) + domains_list = list(domains_to_check) - tasks = [dom.is_registered(resolver, semaphore) for dom in domains_to_check] + tasks = [_resolves_non_wildcard(dom) for dom in domains_list] results = await asyncio.gather(*tasks) - return {dom for dom, is_reg in zip(domains_list, results) if is_reg} + return {dom for dom, keep in zip(domains_list, results) if keep} diff --git a/tests/test_resolve.py b/tests/test_resolve.py index deb5e14..550594c 100644 --- a/tests/test_resolve.py +++ b/tests/test_resolve.py @@ -1,16 +1,19 @@ """Tests for DNS resolution functionality. This module contains tests that verify the DNS resolution and domain -registration checking works correctly for various domain inputs. +registration checking works correctly for various domain inputs, +including wildcard DNS detection and filtering. """ import asyncio -from subwiz.resolve import get_registered_domains +import aiodns + +from subwiz.resolve import detect_wildcard, get_registered_domains, NAME_SERVERS, TIMEOUT, TRIES from subwiz.type import Domain -def test_(): +def test_registered_domains(): """Test that DNS resolution correctly identifies registered domains. Verifies that the get_registered_domains function can distinguish between @@ -23,3 +26,29 @@ def test_(): get_registered_domains(input_domains, resolution_concurrency=10) ) assert registered_domains == {Domain("api.hadrian.io"), Domain("app.hadrian.io")} + + +def test_wildcard_detection_non_wildcard(): + """Test that wildcard detection returns None for non-wildcard domains.""" + + async def _check(): + resolver = aiodns.DNSResolver( + nameservers=NAME_SERVERS, timeout=TIMEOUT, tries=TRIES + ) + return await detect_wildcard("hadrian.io", resolver) + + # hadrian.io does not have a wildcard record + result = asyncio.run(_check()) + assert result is None + + +def test_registered_domains_with_apex(): + """Test that passing apex_domain still returns correct results for non-wildcard domains.""" + domain_strings = {"api.hadrian.io", "app.hadrian.io", "random_string.hadrian.io"} + input_domains = {Domain(dom) for dom in domain_strings} + registered_domains = asyncio.run( + get_registered_domains( + input_domains, resolution_concurrency=10, apex_domain="hadrian.io" + ) + ) + assert registered_domains == {Domain("api.hadrian.io"), Domain("app.hadrian.io")}