Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions multiaddr/codecs/certhash.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, cast

import multibase
import multihash
Expand Down Expand Up @@ -54,7 +54,7 @@ def to_bytes(self, proto: Any, string: str) -> bytes:
"""
try:
# Decode the multibase string to get the raw multihash bytes.
decoded_bytes = multibase.decode(string)
decoded_bytes = cast(bytes, multibase.decode(string))
except Exception as e:
raise ValueError(f"Failed to decode multibase string: {string}") from e

Expand Down
15 changes: 13 additions & 2 deletions multiaddr/resolvers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
"""DNS resolution support for multiaddr."""

from .base import Resolver
from .dns import DNSResolver
from .dns import DNSADDR_TXT_PREFIX, DNSResolver
from .util import addr_len, fqdn, is_fqdn, matches, offset_addr, resolve_all

__all__ = ["DNSResolver", "Resolver"]
__all__ = [
"DNSADDR_TXT_PREFIX",
"DNSResolver",
"Resolver",
"addr_len",
"fqdn",
"is_fqdn",
"matches",
"offset_addr",
"resolve_all",
]
9 changes: 6 additions & 3 deletions multiaddr/resolvers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from ..protocols import P_DNS, P_DNS4, P_DNS6, P_DNSADDR, Protocol
from .base import Resolver

DNSADDR_TXT_PREFIX = "dnsaddr="
"""Prefix for dnsaddr TXT records (e.g. ``dnsaddr=/ip4/...``)."""


class DNSResolver(Resolver):
"""
Expand Down Expand Up @@ -226,8 +229,8 @@ async def _query_dnsaddr_txt_records(
else:
txt_data = str(txt_data_raw)
logging.debug(f"{indent} TXT: {txt_data}")
if txt_data.startswith("dnsaddr="):
multiaddr_str = txt_data[8:]
if txt_data.startswith(DNSADDR_TXT_PREFIX):
multiaddr_str = txt_data[len(DNSADDR_TXT_PREFIX) :]
multiaddr_str = self._clean_quotes(multiaddr_str).strip()
logging.debug(f"{indent} Parsed multiaddr: {multiaddr_str}")
if not multiaddr_str:
Expand Down Expand Up @@ -405,4 +408,4 @@ async def _resolve_dns_with_stack(
return results


__all__ = ["DNSResolver"]
__all__ = ["DNSADDR_TXT_PREFIX", "DNSResolver"]
97 changes: 97 additions & 0 deletions multiaddr/resolvers/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Utility functions for multiaddr resolution."""

from __future__ import annotations

from typing import TYPE_CHECKING

from ..exceptions import RecursionLimitError
from ..multiaddr import Multiaddr
from ..protocols import P_DNS, P_DNS4, P_DNS6, P_DNSADDR

if TYPE_CHECKING:
from .base import Resolver

__all__ = ["addr_len", "fqdn", "is_fqdn", "matches", "offset_addr", "resolve_all"]

_DNS_PROTOCOLS = frozenset({P_DNS, P_DNS4, P_DNS6, P_DNSADDR})


def is_fqdn(s: str) -> bool:
"""Check if string is a fully qualified domain name (ends with unescaped dot)."""
if not s:
return False
# Count trailing backslashes before the final character
if s[-1] != ".":
return False
# Check if the trailing dot is escaped
num_backslashes = 0
for i in range(len(s) - 2, -1, -1):
if s[i] == "\\":
num_backslashes += 1
else:
break
# Odd number of backslashes means the dot is escaped
return num_backslashes % 2 == 0


def fqdn(s: str) -> str:
"""Append trailing dot if not already a FQDN."""
if is_fqdn(s):
return s
return s + "."


def addr_len(maddr: Multiaddr) -> int:
"""Count the number of protocol components in a multiaddr."""
return len(list(maddr.protocols()))


def offset_addr(maddr: Multiaddr, n: int) -> Multiaddr:
"""Return a new multiaddr with the first n protocol components removed."""
parts = maddr.split(n)
if len(parts) <= n:
return Multiaddr("/")
return parts[n]


def matches(maddr: Multiaddr) -> bool:
"""Check if a multiaddr contains any DNS protocol component."""
return any(p.code in _DNS_PROTOCOLS for p in maddr.protocols())


async def resolve_all(
resolver: Resolver,
maddr: Multiaddr,
*,
max_iterations: int = 32,
) -> list[Multiaddr]:
"""Resolve all DNS components in a multiaddr iteratively.

Calls resolver.resolve() repeatedly until no DNS components remain.
"""
queue = [maddr]
resolved: list[Multiaddr] = []

for _ in range(max_iterations):
if not queue:
break
next_queue: list[Multiaddr] = []
for addr in queue:
if not matches(addr):
resolved.append(addr)
continue
results = await resolver.resolve(addr)
for r in results:
if matches(r):
next_queue.append(r)
else:
resolved.append(r)
queue = next_queue

if queue:
raise RecursionLimitError(
f"resolve_all exceeded {max_iterations} iterations; "
f"{len(queue)} addresses still contain DNS components"
)

return resolved
5 changes: 3 additions & 2 deletions tests/test_multiaddr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzu:80",
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzuq:-1",
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzu@",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy/certhash/zQmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmp",
"/udp/1234/sctp",
"/udp/1234/udt/1234",
"/udp/1234/utp/1234",
Expand Down Expand Up @@ -142,6 +141,8 @@ def test_invalid(addr_str):
"/ip4/127.0.0.1/tcp/127/webrtc",
"/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g"
"/ip4/127.0.0.1/udp/9090/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
],
) # nopep8
def test_valid(addr_str):
Expand Down
Loading