diff --git a/libp2p/relay/circuit_v2/protocol.py b/libp2p/relay/circuit_v2/protocol.py index ca7cb6da3..644bcc218 100644 --- a/libp2p/relay/circuit_v2/protocol.py +++ b/libp2p/relay/circuit_v2/protocol.py @@ -688,14 +688,27 @@ async def _handle_connect(self, stream: INetStream, msg: HopMessage) -> None: await stream.reset() return - # Check resource limits - if not self.resource_manager.can_accept_connection(peer_id=source_addr): + if not self.resource_manager.can_accept_connection(peer_id=peer_id): + relay_envelope_bytes, _ = env_to_send_in_RPC(self.host) + relay_envelope = unmarshal_envelope(relay_envelope_bytes) + await self._send_status( + stream, + StatusCode.NO_RESERVATION, + "Destination peer has no active reservation on this relay", + relay_envelope, + ) + await stream.reset() + return + + # Separately enforce the source peer's per-reservation connection limit. + source_reservation = self.resource_manager._reservations.get(source_addr) + if source_reservation and not source_reservation.can_accept_connection(): relay_envelope_bytes, _ = env_to_send_in_RPC(self.host) relay_envelope = unmarshal_envelope(relay_envelope_bytes) await self._send_status( stream, StatusCode.RESOURCE_LIMIT_EXCEEDED, - "Connection limit exceeded", + "Source peer has exceeded its connection limit", relay_envelope, ) await stream.reset() diff --git a/libp2p/relay/circuit_v2/protocol_buffer.py b/libp2p/relay/circuit_v2/protocol_buffer.py index 509cea1c6..b141e5180 100644 --- a/libp2p/relay/circuit_v2/protocol_buffer.py +++ b/libp2p/relay/circuit_v2/protocol_buffer.py @@ -21,6 +21,7 @@ class StatusCode(IntEnum): RESERVATION_REFUSED = 100 RESOURCE_LIMIT_EXCEEDED = 101 PERMISSION_DENIED = 102 + NO_RESERVATION = 103 CONNECTION_FAILED = 200 DIAL_REFUSED = 201 STOP_FAILED = 300 diff --git a/newsfragments/1342.bugfix.rst b/newsfragments/1342.bugfix.rst new file mode 100644 index 000000000..fc1c52c02 --- /dev/null +++ b/newsfragments/1342.bugfix.rst @@ -0,0 +1 @@ +Fixed Circuit Relay v2 issue where a source peer could connect to a destination peer without an active reservation on the relay. diff --git a/tests/core/relay/test_circuit_v2_transport.py b/tests/core/relay/test_circuit_v2_transport.py index 34ecabe8c..d08a9bc26 100644 --- a/tests/core/relay/test_circuit_v2_transport.py +++ b/tests/core/relay/test_circuit_v2_transport.py @@ -21,6 +21,7 @@ from libp2p.peer.peerinfo import ( PeerInfo, ) +from libp2p.peer.peerstore import env_to_send_in_RPC from libp2p.relay.circuit_v2.config import RelayConfig, RelayRole from libp2p.relay.circuit_v2.discovery import ( RelayDiscovery, @@ -373,7 +374,24 @@ async def app_echo_handler(stream): await trio.sleep(SLEEP_TIME) - # Step 2: Source connects to Relay + # Step 2: Destination makes a reservation on the relay. + with trio.fail_after(CONNECT_TIMEOUT): + dest_relay_stream = await target_host.new_stream( + relay_host.get_id(), [PROTOCOL_ID] + ) + envelope_bytes, _ = env_to_send_in_RPC(target_host) + reserve_msg = HopMessage( + type=HopMessage.RESERVE, + peer=target_host.get_id().to_bytes(), + senderRecord=envelope_bytes, + ) + await dest_relay_stream.write(reserve_msg.SerializeToString()) + # Read and discard the STATUS response from the relay + await dest_relay_stream.read(1024) + + await trio.sleep(SLEEP_TIME) + + # Step 3: Source connects to Relay with trio.fail_after(CONNECT_TIMEOUT): await connect(client_host, relay_host) assert relay_host.get_id() in client_host.get_network().connections @@ -383,7 +401,7 @@ async def app_echo_handler(stream): relay_id = relay_host.get_id() client_discovery.get_relay = lambda: relay_id - # Step 3: Source tries to dial the destination via p2p-circuit and opens stream + # Step 4: Source tries to dial the destination via p2p-circuit and opens stream relay_addr = relay_host.get_addrs()[0] dest_id = target_host.get_id() p2p_circuit_addr = Multiaddr(f"{relay_addr}/p2p-circuit/p2p/{dest_id}")