diff --git a/CHANGELOG.md b/CHANGELOG.md index 8686643a..79c20d39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased] +## [0.6.2] - 2026-04-07 + +### Fixed +- **Manual reconnect and introduced-peer connect now prefer better public paths more consistently** — the user-triggered reconnect and introduced-peer dial routes now try discovered and public or tunnel endpoints before stale remembered LAN addresses, which makes operator-initiated recovery behave more like the improved background reconnect path. +- **Expected reconnect and probe churn is less noisy in routine logs** — per-attempt outbound connect lines, connect timeouts, missing peer IDs from pre-handshake probes, and broker requests that simply were not routed immediately are now logged more quietly so real connection failures stand out better during catchup and internet-peer debugging. + +## [0.6.1] - 2026-04-07 + +### Fixed +- **Expected websocket handshake churn is quieter in logs** — early disconnects that happen before a peer finishes the websocket opening handshake no longer spray a scary library-level stack trace into server logs when the underlying condition is just a dropped or non-Canopy client connection. +- **Reconnect now prefers learned public or tunnel endpoints over stale private addresses** — once Canopy has a reusable public callback path for a peer, stored reconnect attempts stop burning retries on older `192.168.*` endpoints before trying the internet-reachable address. +- **Introduced-peer brokering is more truthful and useful for internet peers** — broker-only connect flows now require an actually connected broker candidate instead of treating historical introducers as live paths, and broker requests reuse the same advertised public or tunnel endpoints learned by the handshake path. + ## [0.6.0] - 2026-04-06 ### Changed diff --git a/canopy/__init__.py b/canopy/__init__.py index cfe1933b..26474ecc 100644 --- a/canopy/__init__.py +++ b/canopy/__init__.py @@ -8,7 +8,7 @@ License: Apache 2.0 """ -__version__ = "0.6.0" +__version__ = "0.6.2" __protocol_version__ = 1 __author__ = "Canopy Contributors" __license__ = "Apache-2.0" diff --git a/canopy/api/routes.py b/canopy/api/routes.py index 282e2ef2..ef7f014a 100644 --- a/canopy/api/routes.py +++ b/canopy/api/routes.py @@ -3107,7 +3107,18 @@ def import_p2p_invite(): if not connected: result['status'] = 'imported_not_connected' - result['message'] = 'Peer registered but could not connect to any endpoint. Make sure the peer is online and reachable.' + if not list(result.get('endpoints') or []): + result['diagnostic_code'] = 'invite_has_no_usable_endpoints' + result['message'] = ( + 'Peer registered, but the invite did not contain any usable direct endpoints. ' + 'Ask the remote peer to regenerate the invite with a public or tunnel endpoint.' + ) + else: + result['diagnostic_code'] = 'direct_connect_failed' + result['message'] = ( + 'Peer registered but could not connect to any advertised endpoint. ' + 'The peer may be offline, the endpoint may be stale, or the connection may require broker/relay help.' + ) _record_connection_event( p2p_manager, invite.peer_id, @@ -3178,17 +3189,70 @@ def connect_introduced_peer(): if not intro: return jsonify({'error': 'Peer not found in introduced list'}), 404 - endpoints = intro.get('endpoints', []) - if not endpoints: - return jsonify({'error': 'No endpoints available for this peer'}), 400 - ev_loop = p2p_manager._event_loop if not ev_loop or ev_loop.is_closed(): return jsonify({'error': 'P2P event loop unavailable'}), 500 - from ..network.invite import parse_invite_endpoint + from ..network.invite import canonicalize_invite_endpoint, parse_invite_endpoint + import ipaddress + + def _endpoint_priority(endpoint: str) -> tuple[int, str]: + parsed = parse_invite_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + try: + ip = ipaddress.ip_address(text) + except ValueError: + return (0 if scheme == 'wss' else 1, endpoint) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) direct_attempt_count = 0 + discovered_endpoints: list[str] = [] + non_discovered_endpoints: list[str] = [] + seen_endpoints: set[str] = set() + for group in ( + intro.get('endpoints', []), + getattr(getattr(p2p_manager, 'identity_manager', None), 'peer_endpoints', {}).get(peer_id, []), + ): + if not isinstance(group, list): + continue + for endpoint in group: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + non_discovered_endpoints.append(canon) + get_discovered = getattr(p2p_manager, '_get_discovered_peer_endpoints', None) + if callable(get_discovered): + try: + for endpoint in list(get_discovered(peer_id) or []): + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + discovered_endpoints.append(canon) + except Exception: + pass + endpoints = discovered_endpoints + sorted(non_discovered_endpoints, key=_endpoint_priority) + + broker_candidates = [] + get_broker_candidates = getattr(p2p_manager, 'get_introduced_peer_broker_candidates', None) + if callable(get_broker_candidates): + try: + broker_candidates = list(get_broker_candidates(peer_id) or []) + except Exception: + broker_candidates = [] + if force_broker: _record_connection_event( p2p_manager, @@ -3196,7 +3260,7 @@ def connect_introduced_peer(): status='forced_failover', detail='Direct connect skipped by caller; testing broker/relay path', ) - else: + elif endpoints: for ep in endpoints: direct_attempt_count += 1 try: @@ -3240,53 +3304,53 @@ def connect_introduced_peer(): except Exception as ce: logger.warning(f"Connect to introduced {ep} failed: {ce}") continue + else: + _record_connection_event( + p2p_manager, + peer_id, + status='broker_only', + detail='No direct endpoints announced; trying broker path', + ) # Direct connection failed — try connection brokering. # Prefer connected introducers, then other connected peers as fallback. attempted_brokers: list[str] = [] if p2p_manager.relay_policy != 'off': - broker_candidates: list[str] = [] - seen_brokers: set[str] = set() - - connected_peers: list[str] = [] - try: - connected_peers = list(p2p_manager.get_connected_peers() or []) - except Exception: - connected_peers = [] - connected_set = set(connected_peers) + if not broker_candidates: + seen_brokers: set[str] = set() + connected_peers: list[str] = [] + try: + connected_peers = list(p2p_manager.get_connected_peers() or []) + except Exception: + connected_peers = [] + connected_set = set(connected_peers) - local_peer_id = '' - try: - local_peer_id = p2p_manager.get_peer_id() or '' - except Exception: local_peer_id = '' - - introducers: list[str] = [] - introduced_via = intro.get('introduced_via', []) - if isinstance(introduced_via, list): - for pid in introduced_via: - if isinstance(pid, str) and pid: - introducers.append(pid) - introduced_by = intro.get('introduced_by') - if isinstance(introduced_by, str) and introduced_by: - introducers.append(introduced_by) - - connected_introducers = [pid for pid in introducers if pid in connected_set] - disconnected_introducers = [pid for pid in introducers if pid not in connected_set] - - for pid in connected_introducers: - if pid not in seen_brokers: - seen_brokers.add(pid) - broker_candidates.append(pid) - - for pid in connected_peers: - if not pid or pid == peer_id or pid == local_peer_id or pid in seen_brokers: - continue - seen_brokers.add(pid) - broker_candidates.append(pid) - - for pid in disconnected_introducers: - if pid not in seen_brokers: + try: + local_peer_id = p2p_manager.get_peer_id() or '' + except Exception: + local_peer_id = '' + + introducers: list[str] = [] + introduced_via = intro.get('introduced_via', []) + if isinstance(introduced_via, list): + for pid in introduced_via: + if isinstance(pid, str) and pid: + introducers.append(pid) + introduced_by = intro.get('introduced_by') + if isinstance(introduced_by, str) and introduced_by: + introducers.append(introduced_by) + + connected_introducers = [pid for pid in introducers if pid in connected_set] + + for pid in connected_introducers: + if pid not in seen_brokers: + seen_brokers.add(pid) + broker_candidates.append(pid) + + for pid in connected_peers: + if not pid or pid == peer_id or pid == local_peer_id or pid in seen_brokers: + continue seen_brokers.add(pid) broker_candidates.append(pid) @@ -3315,9 +3379,13 @@ def connect_introduced_peer(): 'via_peer': broker_peer, 'attempted_brokers': attempted_brokers, 'forced_failover': force_broker, - 'direct_attempted': not force_broker, + 'direct_attempted': bool(endpoints) and not force_broker, 'direct_attempt_count': direct_attempt_count, + 'diagnostic_code': 'introduced_peer_broker_connect', 'message': ( + 'No direct endpoints were announced; broker request sent. ' + 'The introducer or relay peer will attempt to connect the target back.' + if not endpoints else 'Direct connection failed; broker request sent. ' 'The target peer will attempt to connect back. ' 'If both peers remain unreachable, use a broker with Full Relay enabled.' @@ -3330,7 +3398,26 @@ def connect_introduced_peer(): status='failed', detail='Introduced peer connection failed', ) - guidance = 'Could not connect to any endpoint' + if not endpoints: + guidance = ( + 'This introduced peer does not currently advertise any direct endpoints. ' + 'Keep the introducer online, retry through a broker, or import a fresh raw invite from the target peer.' + ) + if attempted_brokers: + guidance += f" ({len(attempted_brokers)} broker{'s' if len(attempted_brokers) != 1 else ''} tried)" + return jsonify({ + 'status': 'failed', + 'error': 'No endpoints available for this introduced peer record', + 'diagnostic_code': 'introduced_peer_has_no_direct_endpoints', + 'message': guidance, + 'attempted_brokers': attempted_brokers, + 'relay_policy': getattr(p2p_manager, 'relay_policy', 'broker_only'), + 'forced_failover': force_broker, + 'direct_attempted': False, + 'direct_attempt_count': 0, + }), 502 + + guidance = 'Could not connect to any advertised endpoint' if attempted_brokers: guidance += f" and no broker succeeded ({len(attempted_brokers)} attempted)" if p2p_manager.relay_policy != 'full_relay': @@ -3341,10 +3428,11 @@ def connect_introduced_peer(): return jsonify({ 'status': 'failed', 'message': guidance, + 'diagnostic_code': 'introduced_peer_direct_connect_failed', 'attempted_brokers': attempted_brokers, 'relay_policy': getattr(p2p_manager, 'relay_policy', 'broker_only'), 'forced_failover': force_broker, - 'direct_attempted': not force_broker, + 'direct_attempted': bool(endpoints) and not force_broker, 'direct_attempt_count': direct_attempt_count, }), 502 @@ -3386,7 +3474,57 @@ def reconnect_known_peer(): }) im = p2p_manager.identity_manager - endpoints = im.peer_endpoints.get(peer_id, []) + endpoints: list[str] = [] + seen_endpoints: set[str] = set() + if not endpoints: + stored_endpoints = list(im.peer_endpoints.get(peer_id, []) or []) + discovered_endpoints: list[str] = [] + get_discovered = getattr(p2p_manager, '_get_discovered_peer_endpoints', None) + if callable(get_discovered): + try: + discovered_endpoints = list(get_discovered(peer_id) or []) + except Exception: + discovered_endpoints = [] + + from ..network.invite import canonicalize_invite_endpoint, parse_invite_endpoint + import ipaddress + + def _endpoint_priority(endpoint: str) -> tuple[int, str]: + parsed = parse_invite_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + try: + ip = ipaddress.ip_address(text) + except ValueError: + return (0 if scheme == 'wss' else 1, endpoint) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) + + for endpoint in discovered_endpoints: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + endpoints.append(canon) + + stored_canonical = [] + for endpoint in stored_endpoints: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + stored_canonical.append(canon) + endpoints.extend(sorted(stored_canonical, key=_endpoint_priority)) + if not endpoints: return jsonify({'error': 'No known endpoints for this peer'}), 400 diff --git a/canopy/network/connection.py b/canopy/network/connection.py index ba54fd9b..c9840538 100644 --- a/canopy/network/connection.py +++ b/canopy/network/connection.py @@ -21,6 +21,22 @@ logger = logging.getLogger('canopy.network.connection') + +class _ExpectedWebSocketServerNoiseFilter(logging.Filter): + """Suppress noisy early-disconnect errors from the websockets server logger.""" + + def filter(self, record: logging.LogRecord) -> bool: + if record.levelno < logging.ERROR: + return True + if record.getMessage() != "opening handshake failed": + return True + exc = record.exc_info[1] if record.exc_info else None + return not isinstance(exc, websockets.exceptions.ConnectionClosedError) + + +_ws_server_logger = logging.getLogger('canopy.network.connection.websockets_server') +_ws_server_logger.addFilter(_ExpectedWebSocketServerNoiseFilter()) + _EXPECTED_CONNECT_ERRNOS = {51, 61, 64, 65, 110, 111, 113} @@ -156,6 +172,7 @@ class PeerConnection: canopy_version: Optional[str] = None protocol_version: Optional[int] = None endpoint_uri: Optional[str] = None + advertised_endpoints: List[str] = field(default_factory=list) failure_reason: Optional[str] = None failure_detail: Optional[str] = None _send_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) @@ -190,6 +207,7 @@ def __init__(self, local_peer_id: str, identity_manager: Any, tls_key_path: Optional[str] = None, enable_tls: bool = False, handshake_capabilities: Optional[List[str]] = None, + advertised_endpoints_provider: Optional[Callable[[], List[str]]] = None, canopy_version: str = "0.1.0", protocol_version: int = 1, reject_protocol_mismatch: bool = False): @@ -214,6 +232,7 @@ def __init__(self, local_peer_id: str, identity_manager: Any, cap for cap in (str(item).strip() for item in base_capabilities) if cap ] or ['chat', 'files', 'voice'] + self.advertised_endpoints_provider = advertised_endpoints_provider self.local_canopy_version = str(canopy_version or '0.1.0').strip() or '0.1.0' self.local_protocol_version = self._coerce_protocol_version(protocol_version, default=1) self.reject_protocol_mismatch = bool(reject_protocol_mismatch) @@ -451,7 +470,40 @@ def _signed_optional_handshake_fields(payload: Dict[str, Any]) -> Dict[str, Any] extra['canopy_version'] = payload.get('canopy_version') if 'protocol_version' in payload: extra['protocol_version'] = payload.get('protocol_version') + if 'advertised_endpoints' in payload: + extra['advertised_endpoints'] = payload.get('advertised_endpoints') return extra + + @staticmethod + def _normalize_endpoint_payload(raw: Any) -> List[str]: + """Normalize a handshake endpoint payload into a trimmed string list.""" + if isinstance(raw, str): + raw_values = [raw] + elif isinstance(raw, (list, tuple, set)): + raw_values = list(raw) + else: + raw_values = [] + + out: List[str] = [] + seen = set() + for endpoint in raw_values: + text = str(endpoint or '').strip() + if not text or text in seen: + continue + seen.add(text) + out.append(text) + return out + + def _get_advertised_endpoints(self) -> List[str]: + """Return the local node's self-advertised dial-back endpoints.""" + provider = self.advertised_endpoints_provider + if not callable(provider): + return [] + try: + return self._normalize_endpoint_payload(provider() or []) + except Exception as exc: + logger.debug("Could not compute handshake advertised_endpoints: %s", exc) + return [] async def start(self) -> None: """Start connection manager and WebSocket server.""" @@ -473,6 +525,7 @@ async def start(self) -> None: ping_timeout=None, max_size=20 * 1024 * 1024, # 20MB — allow P2P image transfer compression="deflate", # permessage-deflate — ~40-70% savings on JSON frames + logger=_ws_server_logger, ) if self.enable_tls and self._server_ssl: serve_kwargs['ssl'] = self._server_ssl @@ -566,7 +619,7 @@ async def connect_to_peer(self, peer_id: str, address: str, port: int, scheme: O if normalized_scheme not in ('ws', 'wss'): normalized_scheme = '' - logger.info(f"Connecting to peer {peer_id} at {address}:{port}...") + logger.debug(f"Connecting to peer {peer_id} at {address}:{port}...") # Create connection object connection = PeerConnection( @@ -645,7 +698,7 @@ async def connect_to_peer(self, peer_id: str, address: str, port: int, scheme: O except (TimeoutError, asyncio.TimeoutError): # Expected when trying stale/unreachable addresses; we try other addresses or retry - logger.info( + logger.debug( f"Connection to {peer_id} at {address}:{port} timed out " "(will try other addresses or retry)" ) @@ -681,7 +734,8 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] websocket: WebSocket connection path: Request path (provided by some websockets versions) """ - logger.info(f"Incoming connection from {websocket.remote_address}") + remote_address = getattr(websocket, 'remote_address', None) + logger.debug(f"Incoming connection from {remote_address}") try: # First message should be handshake with peer ID and signature @@ -690,7 +744,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] peer_id = handshake_data.get('peer_id') if not peer_id: - logger.warning("Handshake missing peer_id") + logger.debug("Handshake missing peer_id from %s", remote_address) await websocket.close() return @@ -777,6 +831,9 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] handshake_data.get('capabilities', []) ) } + connection.advertised_endpoints = self._normalize_endpoint_payload( + handshake_data.get('advertised_endpoints', []) + ) remote_protocol = connection.protocol_version or 1 remote_canopy = str(connection.canopy_version or '0.1.0') @@ -822,6 +879,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] 'canopy_version': connection.canopy_version, 'protocol_version': connection.protocol_version, 'capabilities': list(connection.capabilities or {}), + 'advertised_endpoints': list(connection.advertised_endpoints or []), } self.on_peer_authenticated(peer_id, peer_meta) except TypeError: @@ -836,7 +894,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] await websocket.close() except asyncio.TimeoutError: - logger.warning("Handshake timeout") + logger.debug("Handshake timeout from %s", remote_address) try: await websocket.close() except Exception: @@ -893,6 +951,7 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: **signed_payload, 'canopy_version': self.local_canopy_version, 'protocol_version': self.local_protocol_version, + 'advertised_endpoints': self._get_advertised_endpoints(), 'signature': signature.hex() } @@ -1010,6 +1069,9 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: response.get('capabilities', []) ) } + connection.advertised_endpoints = self._normalize_endpoint_payload( + response.get('advertised_endpoints', []) + ) remote_protocol = connection.protocol_version or 1 remote_canopy = str(connection.canopy_version or '0.1.0') @@ -1043,8 +1105,8 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: return True except Exception as e: - if isinstance(e, websockets.exceptions.ConnectionClosedOK): - logger.info(f"Handshake closed cleanly with {connection.peer_id}: {e}") + if isinstance(e, websockets.exceptions.ConnectionClosed): + logger.debug(f"Handshake with {connection.peer_id} closed by remote: {e}") else: logger.error(f"Handshake failed: {e}", exc_info=True) connection.failure_reason = type(e).__name__ @@ -1089,6 +1151,7 @@ async def _complete_handshake(self, connection: PeerConnection, **signed_payload, 'canopy_version': self.local_canopy_version, 'protocol_version': self.local_protocol_version, + 'advertised_endpoints': self._get_advertised_endpoints(), 'signature': signature.hex() } diff --git a/canopy/network/manager.py b/canopy/network/manager.py index abb09ba3..b4c7d235 100644 --- a/canopy/network/manager.py +++ b/canopy/network/manager.py @@ -1033,6 +1033,7 @@ async def _startup(self) -> None: tls_cert_path=getattr(network_config, 'tls_cert_path', None), tls_key_path=getattr(network_config, 'tls_key_path', None), handshake_capabilities=self._local_capabilities, + advertised_endpoints_provider=self._get_local_advertised_endpoints, canopy_version=self.local_canopy_version, protocol_version=self.local_protocol_version, reject_protocol_mismatch=bool( @@ -1345,6 +1346,23 @@ def _get_advertisable_peer_endpoints(self, peer_id: str) -> list[str]: return stored return self._get_discovered_peer_endpoints(peer_id) + def _get_local_advertised_endpoints(self) -> list[str]: + """Return local endpoints safe to advertise during handshake.""" + if not self.identity_manager or not self.identity_manager.local_identity: + return [] + try: + from .invite import generate_invite + + mesh_port = int(getattr(self.config.network, 'mesh_port', 0) or 0) + if mesh_port <= 0: + return [] + invite = generate_invite(self.identity_manager, mesh_port) + local_peer_id = self.local_identity.peer_id if self.local_identity else '' + return self._sanitize_endpoints(local_peer_id, list(invite.endpoints or [])) + except Exception as exc: + logger.debug("Could not compute local advertised endpoints: %s", exc) + return [] + @staticmethod def _merge_endpoint_lists(*endpoint_groups: list[str]) -> list[str]: """Merge endpoint lists while preserving order and removing duplicates.""" @@ -1358,6 +1376,33 @@ def _merge_endpoint_lists(*endpoint_groups: list[str]) -> list[str]: merged.append(endpoint) return merged + @staticmethod + def _endpoint_reconnect_priority(endpoint: str) -> tuple[int, str]: + """Prefer public/tunnel endpoints over stale private remembered ones.""" + parsed = P2PNetworkManager._parse_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + + try: + import ipaddress + + ip = ipaddress.ip_address(text) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) + except ValueError: + # Hostnames and tunnel domains are typically the best reconnect path + # once discovery is no longer giving us a live LAN address. + return (0 if scheme == 'wss' else 1, endpoint) + def _get_connectable_peer_endpoints(self, peer_id: str, prefer_discovered: bool = True) -> list[str]: """Return the best available endpoints for dialing a peer. @@ -1371,6 +1416,7 @@ def _get_connectable_peer_endpoints(self, peer_id: str, prefer_discovered: bool if stored != self.identity_manager.peer_endpoints.get(peer_id, []): self.identity_manager.peer_endpoints[peer_id] = stored self.identity_manager._save_known_peers() + stored = sorted(stored, key=self._endpoint_reconnect_priority) discovered = self._get_discovered_peer_endpoints(peer_id) if discovered: self._remember_discovered_peer_endpoints(peer_id) @@ -1402,6 +1448,67 @@ def _remember_discovered_peer_endpoints(self, peer_id: str) -> list[str]: ) return endpoints + def _remember_peer_advertised_endpoints( + self, + peer_id: str, + endpoints: list[str], + *, + source: str = 'peer_advertised', + ) -> list[str]: + """Persist explicit dial-back endpoints announced by an authenticated peer.""" + clean_peer_id = str(peer_id or '').strip() + if not clean_peer_id: + return [] + sanitized = self._sanitize_endpoints(clean_peer_id, endpoints or []) + if not sanitized: + return [] + + existing = list(self.identity_manager.peer_endpoints.get(clean_peer_id, []) or []) + merged = self._merge_endpoint_lists(existing, sanitized) + if merged != existing: + self.identity_manager.peer_endpoints[clean_peer_id] = merged + self.identity_manager._save_known_peers() + logger.info( + "Recorded %d advertised endpoint(s) for %s via %s", + len(sanitized), + clean_peer_id, + source, + ) + + introduced = self._introduced_peers.get(clean_peer_id) + if isinstance(introduced, dict): + prior_sources = list(introduced.get('endpoint_sources') or []) + if source and source not in prior_sources: + prior_sources.append(source) + introduced['endpoint_sources'] = prior_sources + introduced['endpoint_provenance'] = source + introduced['endpoints'] = self._merge_endpoint_lists( + list(introduced.get('endpoints') or []), + sanitized, + ) + return sanitized + + def _remember_live_peer_endpoints(self, peer_id: str) -> list[str]: + """Persist reusable endpoints learned from the authenticated live connection.""" + if not self.connection_manager or not peer_id: + return [] + conn = self.connection_manager.get_connection(peer_id) + if not conn: + return [] + + advertised = list(getattr(conn, 'advertised_endpoints', []) or []) + remembered = self._remember_peer_advertised_endpoints(peer_id, advertised) + if remembered: + return remembered + + introduced = self._introduced_peers.get(peer_id) if hasattr(self, '_introduced_peers') else None + if isinstance(introduced, dict) and not introduced.get('endpoints'): + logger.info( + "Peer %s is connected but has no reusable advertised endpoints; introductions may require broker fallback", + peer_id, + ) + return [] + def _record_endpoint_result( self, peer_id: str, @@ -1872,6 +1979,14 @@ def _on_incoming_peer_authenticated(self, peer_id: str, peer_meta: Optional[Dict status='connected', detail='Incoming connection authenticated', ) + if isinstance(peer_meta, dict): + try: + self._remember_peer_advertised_endpoints( + peer_id, + list(peer_meta.get('advertised_endpoints') or []), + ) + except Exception: + pass # Do not invent reconnect endpoints from socket origin addresses. Only # persist discovery-derived endpoints, which are authoritative enough # to survive reconnects and peer announcements. @@ -2029,6 +2144,7 @@ async def _run_post_connect_sync_impl(self, peer_id: str) -> None: # Only cancel reconnect once the current connection has survived the # settle window; otherwise a brief flap can strand the peer. self._cancel_reconnect(peer_id) + self._remember_live_peer_endpoints(peer_id) # Notify application layer if self.on_peer_connected: @@ -2214,6 +2330,12 @@ async def _send_peer_announcement_to(self, peer_id: str) -> None: if not identity: continue endpoints = self._get_advertisable_peer_endpoints(pid) + if not endpoints: + logger.info( + "Peer %s is connected but has no re-advertisable endpoints while announcing to %s; broker fallback may be required", + pid, + peer_id, + ) device_profile = None if self.get_peer_device_profile: try: @@ -2256,6 +2378,11 @@ async def _announce_new_peer_to_others(self, new_peer_id: str) -> None: if not identity: return endpoints = self._get_advertisable_peer_endpoints(new_peer_id) + if not endpoints: + logger.info( + "New peer %s has no re-advertisable endpoints; downstream peers may need broker fallback", + new_peer_id, + ) device_profile = None if self.get_peer_device_profile: try: @@ -2389,7 +2516,99 @@ def get_introduced_peers(self) -> list: """Return the list of peers introduced by our contacts.""" if not hasattr(self, '_introduced_peers'): self._introduced_peers = {} - return list(self._introduced_peers.values()) + rows: list[dict[str, Any]] = [] + for peer_id, record in self._introduced_peers.items(): + if not isinstance(record, dict): + continue + introduced_endpoints = self._sanitize_endpoints( + peer_id, + list(record.get('endpoints') or []), + ) + stored_endpoints = self._sanitize_endpoints( + peer_id, + self.identity_manager.peer_endpoints.get(peer_id, []), + ) + discovered_endpoints = self._get_discovered_peer_endpoints(peer_id) + merged_endpoints = self._merge_endpoint_lists( + discovered_endpoints, + introduced_endpoints, + stored_endpoints, + ) + endpoint_sources: list[str] = [] + if discovered_endpoints: + endpoint_sources.append('discovered') + if introduced_endpoints: + endpoint_sources.append('introduced') + if stored_endpoints: + endpoint_sources.append('stored') + for source in list(record.get('endpoint_sources') or []): + text = str(source or '').strip() + if text and text not in endpoint_sources: + endpoint_sources.append(text) + + broker_candidates = self.get_introduced_peer_broker_candidates(peer_id) + connect_strategy = 'direct_or_broker' + if not merged_endpoints: + connect_strategy = 'broker_only' if broker_candidates else 'unreachable' + + row = dict(record) + row['peer_id'] = peer_id + row['endpoints'] = merged_endpoints + row['endpoint_count'] = len(merged_endpoints) + row['endpoint_sources'] = endpoint_sources or ['none'] + row['connect_strategy'] = connect_strategy + row['broker_candidates'] = broker_candidates + rows.append(row) + return rows + + def get_introduced_peer_broker_candidates(self, peer_id: str) -> list[str]: + """Return currently reachable broker peers for a given introduced peer.""" + clean_peer_id = str(peer_id or '').strip() + if not clean_peer_id or not hasattr(self, '_introduced_peers'): + return [] + intro = self._introduced_peers.get(clean_peer_id) + if not isinstance(intro, dict): + return [] + + connected_peers: list[str] = [] + try: + connected_peers = list(self.get_connected_peers() or []) + except Exception: + connected_peers = [] + connected_set = set(connected_peers) + + local_peer_id = '' + try: + local_peer_id = self.get_peer_id() or '' + except Exception: + local_peer_id = '' + + introducers: list[str] = [] + introduced_via = intro.get('introduced_via', []) + if isinstance(introduced_via, list): + for pid in introduced_via: + text = str(pid or '').strip() + if text: + introducers.append(text) + introduced_by = str(intro.get('introduced_by') or '').strip() + if introduced_by: + introducers.append(introduced_by) + + broker_candidates: list[str] = [] + seen_brokers: set[str] = set() + + for pid in introducers: + if pid in connected_set and pid not in seen_brokers: + seen_brokers.add(pid) + broker_candidates.append(pid) + + for pid in connected_peers: + if not pid or pid == clean_peer_id or pid == local_peer_id or pid in seen_brokers: + continue + seen_brokers.add(pid) + broker_candidates.append(pid) + + return broker_candidates def get_peer_public_identity(self, peer_id: str) -> Dict[str, Any]: """Return a public-safe identity preview for a peer before trust.""" @@ -2688,15 +2907,17 @@ def send_broker_request(self, target_peer_id: str, 'x25519_public_key': base58.b58encode(local_id.x25519_public_key).decode(), } - # Gather our own endpoints - mesh_port = self.config.network.mesh_port - requester_endpoints = [] - try: - from .invite import get_local_ips - for ip in get_local_ips(): - requester_endpoints.append(f"{self.ws_scheme}://{ip}:{mesh_port}") - except Exception: - requester_endpoints.append(f"{self.ws_scheme}://0.0.0.0:{mesh_port}") + # Prefer the same public/tunnel endpoints we advertise in handshakes so + # brokered connect-backs do not regress to LAN-only addresses. + requester_endpoints = list(self._get_local_advertised_endpoints() or []) + if not requester_endpoints: + mesh_port = self.config.network.mesh_port + try: + from .invite import get_local_ips + for ip in get_local_ips(): + requester_endpoints.append(f"{self.ws_scheme}://{ip}:{mesh_port}") + except Exception: + requester_endpoints.append(f"{self.ws_scheme}://0.0.0.0:{mesh_port}") future = asyncio.run_coroutine_threadsafe( self.message_router.send_broker_request( @@ -2718,7 +2939,7 @@ def send_broker_request(self, target_peer_id: str, via_peer=via_peer_id, ) else: - logger.warning( + logger.info( f"Broker request via {via_peer_id} for {target_peer_id} " f"was not routed immediately" ) diff --git a/canopy/ui/templates/connect.html b/canopy/ui/templates/connect.html index e75a9e52..1ac6b914 100644 --- a/canopy/ui/templates/connect.html +++ b/canopy/ui/templates/connect.html @@ -570,7 +570,7 @@
Discovered Peers (LAN)
Peers via Your Contacts
-

These peers were introduced by your connected contacts. Click Connect to establish a direct link.

+

These peers were introduced by your connected contacts. If a peer has no direct endpoint, Canopy will fall back to broker-assisted connection through the introducer.

@@ -612,9 +612,25 @@
Peers via Your Contacts
@@ -908,7 +925,7 @@
Connection Troubleshooting
@@ -952,7 +969,7 @@
1 Same network (LAN)
2 Different location
-

One person port-forwards {{ mesh_port }} on their router, enters their public IP above, then sends the invite code to the friend.

+

One person shares a reachable endpoint: public IP plus port-forwarding, a VPS public IP, or a full tunnel endpoint such as wss://....

3 Friend imports the code
@@ -1077,8 +1094,11 @@
3 Friend imports the code
res.innerHTML = ' Connected to ' + safePeerId + ' via ' + safeEndpoint + '!'; } else if (data.status === 'imported_not_connected') { res.className = 'mt-3 alert alert-warning'; - res.innerHTML = ' Peer ' + safePeerId + ' registered but could not connect. ' + - 'Make sure the peer is online and the endpoint is reachable (port forwarded?).'; + if (data.diagnostic_code === 'invite_has_no_usable_endpoints') { + res.innerHTML = ' Peer ' + safePeerId + ' registered, but the invite did not include any usable direct endpoints. Ask the remote peer to regenerate the invite with a public or tunnel endpoint.'; + } else { + res.innerHTML = ' Peer ' + safePeerId + ' registered but could not connect to any advertised endpoint. The peer may be offline, the endpoint may be stale, or the connection may require broker/relay help.'; + } } else if (data.error) { res.className = 'mt-3 alert alert-danger'; res.innerHTML = ' Error: ' + safeError; diff --git a/pyproject.toml b/pyproject.toml index 003daded..e88a3c41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "canopy" -version = "0.6.0" +version = "0.6.2" description = "Local-first peer-to-peer collaboration for humans and AI agents." readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_agent_reliability_endpoints.py b/tests/test_agent_reliability_endpoints.py index d87e75be..66346091 100644 --- a/tests/test_agent_reliability_endpoints.py +++ b/tests/test_agent_reliability_endpoints.py @@ -93,7 +93,7 @@ def get_discovered_peers(self): return [ { 'peer_id': 'peer-alpha', - 'address': '192.168.1.11', + 'address': '198.51.100.11', 'port': 7771, 'connected': True, } diff --git a/tests/test_api_session_fallback.py b/tests/test_api_session_fallback.py index 3fcf784a..5b57994a 100644 --- a/tests/test_api_session_fallback.py +++ b/tests/test_api_session_fallback.py @@ -254,6 +254,39 @@ def test_reconnect_reports_relayed_when_peer_is_reachable_via_relay(self) -> Non self.assertEqual(payload.get('relay_via'), 'relay-1') self.assertEqual(payload.get('relay_via_name'), 'Relay Node') + def test_reconnect_prefers_public_endpoint_over_stale_lan_history(self) -> None: + csrf_token = 'csrf-reconnect-order' + self._set_authenticated_session(csrf_token=csrf_token) + self.p2p_manager._event_loop = MagicMock() + self.p2p_manager._event_loop.is_closed.return_value = False + self.p2p_manager.identity_manager.peer_endpoints = { + 'peer-beta': ['ws://198.51.100.159:7771', 'wss://vps.example.com:443'] + } + + class _ImmediateFuture: + def __init__(self, result): + self._result = result + + def result(self, timeout=None): + return self._result + + with patch( + 'asyncio.run_coroutine_threadsafe', + side_effect=[_ImmediateFuture(False), _ImmediateFuture(True)], + ): + response = self.client.post( + '/api/v1/p2p/reconnect', + json={'peer_id': 'peer-beta'}, + headers={'X-CSRFToken': csrf_token}, + ) + + self.assertEqual(response.status_code, 200) + call_args = self.p2p_manager.connection_manager.connect_to_peer.call_args_list + self.assertEqual(call_args[0].args[1:], ('vps.example.com', 443)) + self.assertEqual(call_args[0].kwargs.get('scheme'), 'wss') + self.assertEqual(call_args[1].args[1:], ('198.51.100.159', 7771)) + self.assertEqual(call_args[1].kwargs.get('scheme'), 'ws') + def test_authorization_header_parses_lowercase_bearer_scheme(self) -> None: key_info = ApiKeyInfo( id='key-test', diff --git a/tests/test_api_stream_endpoints.py b/tests/test_api_stream_endpoints.py index d1e58d2b..a98577c3 100644 --- a/tests/test_api_stream_endpoints.py +++ b/tests/test_api_stream_endpoints.py @@ -420,14 +420,14 @@ def test_stream_proxy_allows_known_private_peer_host(self) -> None: 'peer-remote', json.dumps([{ 'stream_id': 'remote-stream-ok', - 'host_addrs': ['http://10.0.0.5:7770'], + 'host_addrs': ['http://198.51.100.5:7770'], }]), datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() self.p2p_manager.identity_manager.peer_endpoints = { - 'peer-remote': ['10.0.0.5:7771'], + 'peer-remote': ['198.51.100.5:7771'], } self._set_authenticated_session() @@ -463,8 +463,8 @@ def __exit__(self, exc_type, exc, tb): self.assertEqual( called_urls, [ - 'http://10.0.0.5:7770/api/v1/streams/remote-stream-ok/manifest.m3u8', - 'http://10.0.0.5:7770/api/v1/streams/remote-stream-ok/manifest.m3u8', + 'http://198.51.100.5:7770/api/v1/streams/remote-stream-ok/manifest.m3u8', + 'http://198.51.100.5:7770/api/v1/streams/remote-stream-ok/manifest.m3u8', ], ) @@ -484,7 +484,7 @@ def test_stream_proxy_ignores_private_host_not_linked_to_origin_peer(self) -> No ) self.conn.commit() self.p2p_manager.identity_manager.peer_endpoints = { - 'peer-remote': ['10.0.0.5:7771'], + 'peer-remote': ['198.51.100.5:7771'], } self._set_authenticated_session() @@ -516,7 +516,7 @@ def __exit__(self, exc_type, exc, tb): self.assertEqual(response.status_code, 200) called_urls = [call.args[0] for call in mocked_urlopen.call_args_list] self.assertTrue(all('127.0.0.1' not in url for url in called_urls)) - self.assertTrue(all('10.0.0.5' in url for url in called_urls)) + self.assertTrue(all('198.51.100.5' in url for url in called_urls)) def test_stream_proxy_segment_rejects_invalid_segment_name(self) -> None: self._set_authenticated_session() @@ -534,14 +534,14 @@ def test_stream_proxy_segment_forces_binary_type_for_unsafe_remote_content_type( 'peer-remote', json.dumps([{ 'stream_id': 'remote-stream-segment', - 'host_addrs': ['http://10.0.0.5:7770'], + 'host_addrs': ['http://198.51.100.5:7770'], }]), datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() self.p2p_manager.identity_manager.peer_endpoints = { - 'peer-remote': ['10.0.0.5:7771'], + 'peer-remote': ['198.51.100.5:7771'], } self._set_authenticated_session() diff --git a/tests/test_connect_introduced_broker_fallback.py b/tests/test_connect_introduced_broker_fallback.py index 9fd023e9..65c039ed 100644 --- a/tests/test_connect_introduced_broker_fallback.py +++ b/tests/test_connect_introduced_broker_fallback.py @@ -49,6 +49,8 @@ def setUp(self) -> None: self.p2p_manager.get_peer_id.return_value = 'local-peer' self.p2p_manager.get_connected_peers.return_value = [] self.p2p_manager.send_broker_request.return_value = False + self.p2p_manager.get_introduced_peer_broker_candidates = MagicMock(return_value=[]) + self.p2p_manager.identity_manager = types.SimpleNamespace(peer_endpoints={}) self.p2p_manager.connection_manager = MagicMock() self.p2p_manager.connection_manager.connect_to_peer = MagicMock() @@ -104,7 +106,7 @@ def test_connect_introduced_tries_multiple_brokers_until_one_succeeds(self) -> N peer_id = 'peer-target' self.p2p_manager._introduced_peers[peer_id] = { 'peer_id': peer_id, - 'endpoints': ['ws://10.10.10.10:7771'], + 'endpoints': ['ws://203.0.113.10:7771'], 'introduced_by': 'broker-a', 'introduced_via': ['broker-a', 'broker-b'], } @@ -129,7 +131,7 @@ def test_connect_introduced_falls_back_to_other_connected_peer(self) -> None: peer_id = 'peer-target' self.p2p_manager._introduced_peers[peer_id] = { 'peer_id': peer_id, - 'endpoints': ['ws://10.10.10.10:7771'], + 'endpoints': ['ws://203.0.113.10:7771'], 'introduced_by': 'offline-broker', } self.p2p_manager.get_connected_peers.return_value = ['online-broker'] @@ -152,7 +154,7 @@ def test_connect_introduced_failure_includes_relay_guidance(self) -> None: peer_id = 'peer-target' self.p2p_manager._introduced_peers[peer_id] = { 'peer_id': peer_id, - 'endpoints': ['ws://10.10.10.10:7771'], + 'endpoints': ['ws://203.0.113.10:7771'], 'introduced_by': 'offline-broker', 'introduced_via': ['offline-broker', 'offline-broker-2'], } @@ -167,16 +169,35 @@ def test_connect_introduced_failure_includes_relay_guidance(self) -> None: self.assertEqual(payload.get('status'), 'failed') self.assertEqual(payload.get('relay_policy'), 'broker_only') self.assertIn('Relay policy is broker_only', payload.get('message', '')) - self.assertEqual( - payload.get('attempted_brokers'), - ['offline-broker', 'offline-broker-2'], - ) + self.assertEqual(payload.get('attempted_brokers'), []) + + def test_connect_introduced_without_direct_endpoints_uses_broker(self) -> None: + peer_id = 'peer-target' + self.p2p_manager._introduced_peers[peer_id] = { + 'peer_id': peer_id, + 'endpoints': [], + 'introduced_by': 'broker-a', + 'introduced_via': ['broker-a'], + } + self.p2p_manager.get_introduced_peer_broker_candidates.return_value = ['broker-a'] + self.p2p_manager.send_broker_request.return_value = True + + response = self._post_connect_introduced(peer_id) + + self.assertEqual(response.status_code, 202) + payload = response.get_json() or {} + self.assertEqual(payload.get('status'), 'brokering') + self.assertEqual(payload.get('via_peer'), 'broker-a') + self.assertEqual(payload.get('diagnostic_code'), 'introduced_peer_broker_connect') + self.assertFalse(payload.get('direct_attempted')) + self.assertEqual(payload.get('direct_attempt_count'), 0) + self.assertIn('No direct endpoints were announced', payload.get('message', '')) def test_connect_introduced_force_broker_skips_direct_attempts(self) -> None: peer_id = 'peer-target' self.p2p_manager._introduced_peers[peer_id] = { 'peer_id': peer_id, - 'endpoints': ['ws://10.10.10.10:7771'], + 'endpoints': ['ws://203.0.113.10:7771'], 'introduced_via': ['broker-a'], } self.p2p_manager.get_connected_peers.return_value = ['broker-a'] @@ -197,6 +218,26 @@ def test_connect_introduced_force_broker_skips_direct_attempts(self) -> None: self.assertEqual(payload.get('attempted_brokers'), ['broker-a']) run_coro.assert_not_called() + def test_connect_introduced_prefers_public_endpoint_over_stale_lan_history(self) -> None: + peer_id = 'peer-target' + self.p2p_manager._introduced_peers[peer_id] = { + 'peer_id': peer_id, + 'endpoints': ['ws://198.51.100.159:7771', 'wss://vps.example.com:443'], + } + + with patch( + 'asyncio.run_coroutine_threadsafe', + side_effect=[_ImmediateFuture(False), _ImmediateFuture(True)], + ): + response = self._post_connect_introduced(peer_id) + + self.assertEqual(response.status_code, 200) + call_args = self.p2p_manager.connection_manager.connect_to_peer.call_args_list + self.assertEqual(call_args[0].args[1:], ('vps.example.com', 443)) + self.assertEqual(call_args[0].kwargs.get('scheme'), 'wss') + self.assertEqual(call_args[1].args[1:], ('198.51.100.159', 7771)) + self.assertEqual(call_args[1].kwargs.get('scheme'), 'ws') + if __name__ == '__main__': unittest.main() diff --git a/tests/test_connection_diagnostics_endpoint.py b/tests/test_connection_diagnostics_endpoint.py index fffe71a9..6fe096a3 100644 --- a/tests/test_connection_diagnostics_endpoint.py +++ b/tests/test_connection_diagnostics_endpoint.py @@ -257,20 +257,20 @@ def test_disconnected_known_peer_included_with_endpoint_details(self): """Known but disconnected peers should still appear with endpoint diagnostics.""" self.p2p_manager.identity_manager.known_peers = {'peer-abc': object()} self.p2p_manager.identity_manager.peer_endpoints = { - 'peer-abc': ['ws://192.168.1.50:7771'] + 'peer-abc': ['ws://198.51.100.50:7771'] } self.p2p_manager.get_discovered_peers.return_value = [ { 'peer_id': 'peer-abc', - 'address': '192.168.1.50', - 'addresses': ['192.168.1.50'], + 'address': '198.51.100.50', + 'addresses': ['198.51.100.50'], 'port': 7771, 'connected': False, } ] self.p2p_manager.get_peer_endpoint_diagnostics.side_effect = lambda peer_id: ( [{ - 'endpoint': 'ws://192.168.1.50:7771', + 'endpoint': 'ws://198.51.100.50:7771', 'sources': ['stored', 'discovered'], 'currently_connected': False, 'attempt_count': 2, diff --git a/tests/test_connection_log_noise_regressions.py b/tests/test_connection_log_noise_regressions.py index df7371ae..1f62adec 100644 --- a/tests/test_connection_log_noise_regressions.py +++ b/tests/test_connection_log_noise_regressions.py @@ -114,16 +114,16 @@ async def test_handshake_peerid_mismatch_reassigns_endpoint_to_actual_peer(self) manager._record_handshake_peerid_mismatch( expected_peer_id='peer-expected', actual_peer_id='peer-actual', - endpoint_uri='ws://192.168.1.50:7771/p2p', + endpoint_uri='ws://198.51.100.50:7771/p2p', ) self.assertEqual( identity_manager.removed, - [('peer-expected', 'ws://192.168.1.50:7771')], + [('peer-expected', 'ws://198.51.100.50:7771')], ) self.assertEqual( identity_manager.recorded, - [('peer-actual', 'ws://192.168.1.50:7771', True)], + [('peer-actual', 'ws://198.51.100.50:7771', True)], ) diff --git a/tests/test_meshspaces_foundation.py b/tests/test_meshspaces_foundation.py index 270ca801..36bc1713 100644 --- a/tests/test_meshspaces_foundation.py +++ b/tests/test_meshspaces_foundation.py @@ -380,7 +380,7 @@ def test_meshspace_shell_summary_endpoint_requires_loopback(self) -> None: blocked = self.client.get( '/api/v1/meshspace/shell_summary', - environ_overrides={'REMOTE_ADDR': '10.0.0.8'}, + environ_overrides={'REMOTE_ADDR': '203.0.113.8'}, ) self.assertEqual(blocked.status_code, 403) @@ -1046,9 +1046,9 @@ def test_meshspace_direct_url_uses_request_host_for_loopback_launch_url(self) -> 'launch_url': 'http://127.0.0.1:7800', }, current_meshspace_id='family-lab', - request_host='192.168.1.77', + request_host='198.51.100.77', ) - self.assertEqual(target, 'http://192.168.1.77:7800/login') + self.assertEqual(target, 'http://198.51.100.77:7800/login') def test_meshspace_open_shows_stale_runtime_copy_when_registry_is_stale(self) -> None: self._authenticate() diff --git a/tests/test_network_connectivity_regressions.py b/tests/test_network_connectivity_regressions.py index 1b448f3e..685fbe24 100644 --- a/tests/test_network_connectivity_regressions.py +++ b/tests/test_network_connectivity_regressions.py @@ -96,9 +96,9 @@ def test_import_invite_persists_only_sanitized_endpoints_for_reconnect(self) -> ed25519_public_key_b58='11111111111111111111111111111111', x25519_public_key_b58='11111111111111111111111111111111', endpoints=[ - ' ws://192.168.1.50:7771 ', + ' ws://198.51.100.50:7771 ', 'ws://[2001:db8::10]:7771', - 'ws://192.168.1.50:7771', + 'ws://198.51.100.50:7771', 'localhost:7771', 'ws://0.0.0.0:7771', 'not-an-endpoint', @@ -109,11 +109,11 @@ def test_import_invite_persists_only_sanitized_endpoints_for_reconnect(self) -> self.assertEqual( identity_manager.peer_endpoints.get('peer-remote'), - ['ws://192.168.1.50:7771', 'ws://[2001:db8::10]:7771'], + ['ws://198.51.100.50:7771', 'ws://[2001:db8::10]:7771'], ) self.assertEqual( imported['endpoints'], - ['ws://192.168.1.50:7771', 'ws://[2001:db8::10]:7771'], + ['ws://198.51.100.50:7771', 'ws://[2001:db8::10]:7771'], ) def test_generate_invite_accepts_explicit_external_mesh_endpoint(self) -> None: @@ -162,15 +162,15 @@ def test_discovery_preserves_all_advertised_addresses(self) -> None: zeroconf = _FakeZeroconf( _FakeServiceInfo( peer_id='peer-remote', - addresses=[b'\x0a\x00\x00\x02', b'\xc0\xa8\x01\x64'], + addresses=[b'\xc0\x00\x02\x02', b'\xc6\x33\x64\x64'], ) ) discovery._on_service_added(zeroconf, discovery.service_type, 'peer-remote._canopy._tcp.local.') self.assertEqual(len(captured), 1) - self.assertEqual(captured[0].address, '10.0.0.2') - self.assertEqual(captured[0].addresses, ['10.0.0.2', '192.168.1.100']) + self.assertEqual(captured[0].address, '192.0.2.2') + self.assertEqual(captured[0].addresses, ['192.0.2.2', '198.51.100.100']) async def test_connect_to_discovered_peer_tries_all_advertised_addresses(self) -> None: manager = self._build_manager() @@ -179,7 +179,7 @@ async def test_connect_to_discovered_peer_tries_all_advertised_addresses(self) - async def _connect(peer_id: str, endpoint: str) -> bool: attempts.append(endpoint) - return endpoint.endswith('192.168.1.100:7771') + return endpoint.endswith('198.51.100.100:7771') async def _sync(peer_id: str) -> None: sync_calls.append(peer_id) @@ -190,8 +190,8 @@ async def _sync(peer_id: str) -> None: peer = DiscoveredPeer( peer_id='peer-remote', - address='10.0.0.2', - addresses=['10.0.0.2', '192.168.1.100'], + address='192.0.2.2', + addresses=['192.0.2.2', '198.51.100.100'], port=7771, discovered_at=0.0, ) @@ -200,7 +200,7 @@ async def _sync(peer_id: str) -> None: self.assertEqual( attempts, - ['ws://10.0.0.2:7771', 'ws://192.168.1.100:7771'], + ['ws://192.0.2.2:7771', 'ws://198.51.100.100:7771'], ) self.assertEqual(sync_calls, ['peer-remote']) @@ -272,7 +272,7 @@ def test_discovered_peer_endpoints_format_ipv6_for_dialing(self) -> None: async def test_peer_announcement_uses_stored_endpoints_not_socket_origin(self) -> None: manager = self._build_manager() manager.identity_manager.peer_display_names['peer-remote'] = 'Remote Node' - manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://192.168.1.55:7771'] + manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://198.51.100.55:7771'] manager.identity_manager.known_peers['peer-remote'] = types.SimpleNamespace( ed25519_public_key=b'1' * 32, x25519_public_key=b'2' * 32, @@ -298,7 +298,7 @@ async def send_peer_announcement(self, to_peer: str, introduced_peers: list[dict self.assertEqual(len(captured), 1) self.assertEqual( captured[0][0]['endpoints'], - ['ws://192.168.1.55:7771'], + ['ws://198.51.100.55:7771'], ) def test_current_connection_endpoint_preserves_wss_scheme_from_endpoint_uri(self) -> None: @@ -316,6 +316,115 @@ def test_current_connection_endpoint_preserves_wss_scheme_from_endpoint_uri(self 'wss://demo.ngrok-free.app:443', ) + def test_incoming_authenticated_persists_peer_advertised_endpoints(self) -> None: + manager = self._build_manager() + manager._event_loop = None + manager._introduced_peers = { + 'peer-remote': { + 'peer_id': 'peer-remote', + 'introduced_by': 'broker-a', + 'endpoints': [], + } + } + + manager._on_incoming_peer_authenticated( + 'peer-remote', + { + 'advertised_endpoints': ['wss://demo.example.com:443'], + 'capabilities': [], + }, + ) + + self.assertEqual( + manager.identity_manager.peer_endpoints.get('peer-remote'), + ['wss://demo.example.com:443'], + ) + self.assertEqual( + manager._introduced_peers['peer-remote']['endpoints'], + ['wss://demo.example.com:443'], + ) + self.assertIn( + 'peer_advertised', + manager._introduced_peers['peer-remote'].get('endpoint_sources', []), + ) + + def test_get_introduced_peers_marks_broker_only_when_no_direct_endpoints(self) -> None: + manager = self._build_manager() + manager._introduced_peers = { + 'peer-remote': { + 'peer_id': 'peer-remote', + 'introduced_by': 'broker-a', + 'endpoints': [], + } + } + manager.get_connected_peers = lambda: ['broker-a'] + manager.get_peer_id = lambda: 'local-peer' + + introduced = manager.get_introduced_peers() + + self.assertEqual(len(introduced), 1) + self.assertEqual(introduced[0]['connect_strategy'], 'broker_only') + self.assertEqual(introduced[0]['endpoint_sources'], ['none']) + self.assertEqual(introduced[0]['broker_candidates'], ['broker-a']) + + def test_get_introduced_peers_marks_unreachable_without_connected_broker(self) -> None: + manager = self._build_manager() + manager._introduced_peers = { + 'peer-remote': { + 'peer_id': 'peer-remote', + 'introduced_by': 'offline-broker', + 'introduced_via': ['offline-broker'], + 'endpoints': [], + } + } + manager.get_connected_peers = lambda: [] + manager.get_peer_id = lambda: 'local-peer' + + introduced = manager.get_introduced_peers() + + self.assertEqual(len(introduced), 1) + self.assertEqual(introduced[0]['connect_strategy'], 'unreachable') + self.assertEqual(introduced[0]['broker_candidates'], []) + + def test_send_broker_request_prefers_advertised_endpoints(self) -> None: + manager = self._build_manager() + manager._running = True + manager._event_loop = MagicMock() + manager.message_router = types.SimpleNamespace(send_broker_request=MagicMock()) + manager.local_identity = types.SimpleNamespace( + peer_id='local-peer', + ed25519_public_key=b'\x01' * 32, + x25519_public_key=b'\x02' * 32, + ) + manager._record_connection_event = lambda *args, **kwargs: None # type: ignore[assignment] + manager._get_local_advertised_endpoints = lambda: ['wss://demo.example.com:443'] # type: ignore[assignment] + + future = MagicMock() + future.result.return_value = True + + with patch('asyncio.run_coroutine_threadsafe', return_value=future): + sent = manager.send_broker_request('peer-target', 'broker-a') + + self.assertTrue(sent) + self.assertEqual( + manager.message_router.send_broker_request.call_args.kwargs['requester_endpoints'], + ['wss://demo.example.com:443'], + ) + + def test_connectable_endpoints_prefer_public_over_stale_private_storage(self) -> None: + manager = self._build_manager() + manager.identity_manager.peer_endpoints['peer-remote'] = [ + 'ws://198.51.100.159:7771', + 'wss://vps.example.com:443', + ] + + endpoints = manager._get_connectable_peer_endpoints('peer-remote', prefer_discovered=True) + + self.assertEqual( + endpoints, + ['wss://vps.example.com:443', 'ws://198.51.100.159:7771'], + ) + async def test_reconnect_keeps_retrying_after_backoff_cap(self) -> None: manager = self._build_manager() manager._running = True @@ -323,7 +432,7 @@ async def test_reconnect_keeps_retrying_after_backoff_cap(self) -> None: manager.connection_manager = types.SimpleNamespace( is_connected=lambda _peer_id: False ) - manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://192.168.1.50:7771'] + manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://198.51.100.50:7771'] manager._record_connection_event = lambda *args, **kwargs: None # type: ignore[assignment] async def _connect_to_endpoint(peer_id: str, endpoint: str) -> bool: @@ -361,12 +470,12 @@ async def _fast_sleep(_delay: float) -> None: async def test_startup_reconnect_prefers_discovered_endpoints_over_stale_persisted(self) -> None: manager = self._build_manager() manager.identity_manager.known_peers['peer-remote'] = types.SimpleNamespace() - manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://10.0.0.2:7771'] + manager.identity_manager.peer_endpoints['peer-remote'] = ['ws://192.0.2.2:7771'] manager.discovery = types.SimpleNamespace( get_peer=lambda peer_id: DiscoveredPeer( peer_id=peer_id, - address='192.168.1.100', - addresses=['192.168.1.100'], + address='198.51.100.100', + addresses=['198.51.100.100'], port=7771, discovered_at=0.0, ) @@ -377,7 +486,7 @@ async def test_startup_reconnect_prefers_discovered_endpoints_over_stale_persist async def _connect(peer_id: str, endpoint: str) -> bool: attempts.append(endpoint) - if endpoint == 'ws://192.168.1.100:7771': + if endpoint == 'ws://198.51.100.100:7771': connected_peers.add(peer_id) return True return False @@ -400,10 +509,10 @@ async def _fast_sleep(_delay: float) -> None: await manager._reconnect_known_peers() await original_sleep(0) - self.assertEqual(attempts, ['ws://192.168.1.100:7771']) + self.assertEqual(attempts, ['ws://198.51.100.100:7771']) self.assertEqual(sync_calls, ['peer-remote']) self.assertIn( - 'ws://192.168.1.100:7771', + 'ws://198.51.100.100:7771', manager.identity_manager.peer_endpoints.get('peer-remote', []), )
- {% for ep in peer.endpoints %} - {{ ep }} - {% endfor %} + {% if peer.endpoints %} + {% for ep in peer.endpoints %} + {{ ep }} + {% endfor %} + {% if peer.endpoint_sources %} +
+ {% for source in peer.endpoint_sources %} + {{ source }} + {% endfor %} +
+ {% endif %} + {% else %} +
No direct endpoint announced
+ {% if peer.broker_candidates %} +
Broker path available via {{ peer.broker_candidates|length }} peer{{ '' if peer.broker_candidates|length == 1 else 's' }}
+ {% else %} +
Needs a fresh invite or active broker
+ {% endif %} + {% endif %}
{% if ibydev and ibydev.display_name %} @@ -626,7 +642,8 @@
Peers via Your Contacts