diff --git a/CHANGELOG.md b/CHANGELOG.md index 8686643a..79c20d39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased] +## [0.6.2] - 2026-04-07 + +### Fixed +- **Manual reconnect and introduced-peer connect now prefer better public paths more consistently** — the user-triggered reconnect and introduced-peer dial routes now try discovered and public or tunnel endpoints before stale remembered LAN addresses, which makes operator-initiated recovery behave more like the improved background reconnect path. +- **Expected reconnect and probe churn is less noisy in routine logs** — per-attempt outbound connect lines, connect timeouts, missing peer IDs from pre-handshake probes, and broker requests that simply were not routed immediately are now logged more quietly so real connection failures stand out better during catchup and internet-peer debugging. + +## [0.6.1] - 2026-04-07 + +### Fixed +- **Expected websocket handshake churn is quieter in logs** — early disconnects that happen before a peer finishes the websocket opening handshake no longer spray a scary library-level stack trace into server logs when the underlying condition is just a dropped or non-Canopy client connection. +- **Reconnect now prefers learned public or tunnel endpoints over stale private addresses** — once Canopy has a reusable public callback path for a peer, stored reconnect attempts stop burning retries on older `192.168.*` endpoints before trying the internet-reachable address. +- **Introduced-peer brokering is more truthful and useful for internet peers** — broker-only connect flows now require an actually connected broker candidate instead of treating historical introducers as live paths, and broker requests reuse the same advertised public or tunnel endpoints learned by the handshake path. + ## [0.6.0] - 2026-04-06 ### Changed diff --git a/canopy/__init__.py b/canopy/__init__.py index cfe1933b..26474ecc 100644 --- a/canopy/__init__.py +++ b/canopy/__init__.py @@ -8,7 +8,7 @@ License: Apache 2.0 """ -__version__ = "0.6.0" +__version__ = "0.6.2" __protocol_version__ = 1 __author__ = "Canopy Contributors" __license__ = "Apache-2.0" diff --git a/canopy/api/routes.py b/canopy/api/routes.py index 282e2ef2..ef7f014a 100644 --- a/canopy/api/routes.py +++ b/canopy/api/routes.py @@ -3107,7 +3107,18 @@ def import_p2p_invite(): if not connected: result['status'] = 'imported_not_connected' - result['message'] = 'Peer registered but could not connect to any endpoint. Make sure the peer is online and reachable.' + if not list(result.get('endpoints') or []): + result['diagnostic_code'] = 'invite_has_no_usable_endpoints' + result['message'] = ( + 'Peer registered, but the invite did not contain any usable direct endpoints. ' + 'Ask the remote peer to regenerate the invite with a public or tunnel endpoint.' + ) + else: + result['diagnostic_code'] = 'direct_connect_failed' + result['message'] = ( + 'Peer registered but could not connect to any advertised endpoint. ' + 'The peer may be offline, the endpoint may be stale, or the connection may require broker/relay help.' + ) _record_connection_event( p2p_manager, invite.peer_id, @@ -3178,17 +3189,70 @@ def connect_introduced_peer(): if not intro: return jsonify({'error': 'Peer not found in introduced list'}), 404 - endpoints = intro.get('endpoints', []) - if not endpoints: - return jsonify({'error': 'No endpoints available for this peer'}), 400 - ev_loop = p2p_manager._event_loop if not ev_loop or ev_loop.is_closed(): return jsonify({'error': 'P2P event loop unavailable'}), 500 - from ..network.invite import parse_invite_endpoint + from ..network.invite import canonicalize_invite_endpoint, parse_invite_endpoint + import ipaddress + + def _endpoint_priority(endpoint: str) -> tuple[int, str]: + parsed = parse_invite_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + try: + ip = ipaddress.ip_address(text) + except ValueError: + return (0 if scheme == 'wss' else 1, endpoint) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) direct_attempt_count = 0 + discovered_endpoints: list[str] = [] + non_discovered_endpoints: list[str] = [] + seen_endpoints: set[str] = set() + for group in ( + intro.get('endpoints', []), + getattr(getattr(p2p_manager, 'identity_manager', None), 'peer_endpoints', {}).get(peer_id, []), + ): + if not isinstance(group, list): + continue + for endpoint in group: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + non_discovered_endpoints.append(canon) + get_discovered = getattr(p2p_manager, '_get_discovered_peer_endpoints', None) + if callable(get_discovered): + try: + for endpoint in list(get_discovered(peer_id) or []): + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + discovered_endpoints.append(canon) + except Exception: + pass + endpoints = discovered_endpoints + sorted(non_discovered_endpoints, key=_endpoint_priority) + + broker_candidates = [] + get_broker_candidates = getattr(p2p_manager, 'get_introduced_peer_broker_candidates', None) + if callable(get_broker_candidates): + try: + broker_candidates = list(get_broker_candidates(peer_id) or []) + except Exception: + broker_candidates = [] + if force_broker: _record_connection_event( p2p_manager, @@ -3196,7 +3260,7 @@ def connect_introduced_peer(): status='forced_failover', detail='Direct connect skipped by caller; testing broker/relay path', ) - else: + elif endpoints: for ep in endpoints: direct_attempt_count += 1 try: @@ -3240,53 +3304,53 @@ def connect_introduced_peer(): except Exception as ce: logger.warning(f"Connect to introduced {ep} failed: {ce}") continue + else: + _record_connection_event( + p2p_manager, + peer_id, + status='broker_only', + detail='No direct endpoints announced; trying broker path', + ) # Direct connection failed — try connection brokering. # Prefer connected introducers, then other connected peers as fallback. attempted_brokers: list[str] = [] if p2p_manager.relay_policy != 'off': - broker_candidates: list[str] = [] - seen_brokers: set[str] = set() - - connected_peers: list[str] = [] - try: - connected_peers = list(p2p_manager.get_connected_peers() or []) - except Exception: - connected_peers = [] - connected_set = set(connected_peers) + if not broker_candidates: + seen_brokers: set[str] = set() + connected_peers: list[str] = [] + try: + connected_peers = list(p2p_manager.get_connected_peers() or []) + except Exception: + connected_peers = [] + connected_set = set(connected_peers) - local_peer_id = '' - try: - local_peer_id = p2p_manager.get_peer_id() or '' - except Exception: local_peer_id = '' - - introducers: list[str] = [] - introduced_via = intro.get('introduced_via', []) - if isinstance(introduced_via, list): - for pid in introduced_via: - if isinstance(pid, str) and pid: - introducers.append(pid) - introduced_by = intro.get('introduced_by') - if isinstance(introduced_by, str) and introduced_by: - introducers.append(introduced_by) - - connected_introducers = [pid for pid in introducers if pid in connected_set] - disconnected_introducers = [pid for pid in introducers if pid not in connected_set] - - for pid in connected_introducers: - if pid not in seen_brokers: - seen_brokers.add(pid) - broker_candidates.append(pid) - - for pid in connected_peers: - if not pid or pid == peer_id or pid == local_peer_id or pid in seen_brokers: - continue - seen_brokers.add(pid) - broker_candidates.append(pid) - - for pid in disconnected_introducers: - if pid not in seen_brokers: + try: + local_peer_id = p2p_manager.get_peer_id() or '' + except Exception: + local_peer_id = '' + + introducers: list[str] = [] + introduced_via = intro.get('introduced_via', []) + if isinstance(introduced_via, list): + for pid in introduced_via: + if isinstance(pid, str) and pid: + introducers.append(pid) + introduced_by = intro.get('introduced_by') + if isinstance(introduced_by, str) and introduced_by: + introducers.append(introduced_by) + + connected_introducers = [pid for pid in introducers if pid in connected_set] + + for pid in connected_introducers: + if pid not in seen_brokers: + seen_brokers.add(pid) + broker_candidates.append(pid) + + for pid in connected_peers: + if not pid or pid == peer_id or pid == local_peer_id or pid in seen_brokers: + continue seen_brokers.add(pid) broker_candidates.append(pid) @@ -3315,9 +3379,13 @@ def connect_introduced_peer(): 'via_peer': broker_peer, 'attempted_brokers': attempted_brokers, 'forced_failover': force_broker, - 'direct_attempted': not force_broker, + 'direct_attempted': bool(endpoints) and not force_broker, 'direct_attempt_count': direct_attempt_count, + 'diagnostic_code': 'introduced_peer_broker_connect', 'message': ( + 'No direct endpoints were announced; broker request sent. ' + 'The introducer or relay peer will attempt to connect the target back.' + if not endpoints else 'Direct connection failed; broker request sent. ' 'The target peer will attempt to connect back. ' 'If both peers remain unreachable, use a broker with Full Relay enabled.' @@ -3330,7 +3398,26 @@ def connect_introduced_peer(): status='failed', detail='Introduced peer connection failed', ) - guidance = 'Could not connect to any endpoint' + if not endpoints: + guidance = ( + 'This introduced peer does not currently advertise any direct endpoints. ' + 'Keep the introducer online, retry through a broker, or import a fresh raw invite from the target peer.' + ) + if attempted_brokers: + guidance += f" ({len(attempted_brokers)} broker{'s' if len(attempted_brokers) != 1 else ''} tried)" + return jsonify({ + 'status': 'failed', + 'error': 'No endpoints available for this introduced peer record', + 'diagnostic_code': 'introduced_peer_has_no_direct_endpoints', + 'message': guidance, + 'attempted_brokers': attempted_brokers, + 'relay_policy': getattr(p2p_manager, 'relay_policy', 'broker_only'), + 'forced_failover': force_broker, + 'direct_attempted': False, + 'direct_attempt_count': 0, + }), 502 + + guidance = 'Could not connect to any advertised endpoint' if attempted_brokers: guidance += f" and no broker succeeded ({len(attempted_brokers)} attempted)" if p2p_manager.relay_policy != 'full_relay': @@ -3341,10 +3428,11 @@ def connect_introduced_peer(): return jsonify({ 'status': 'failed', 'message': guidance, + 'diagnostic_code': 'introduced_peer_direct_connect_failed', 'attempted_brokers': attempted_brokers, 'relay_policy': getattr(p2p_manager, 'relay_policy', 'broker_only'), 'forced_failover': force_broker, - 'direct_attempted': not force_broker, + 'direct_attempted': bool(endpoints) and not force_broker, 'direct_attempt_count': direct_attempt_count, }), 502 @@ -3386,7 +3474,57 @@ def reconnect_known_peer(): }) im = p2p_manager.identity_manager - endpoints = im.peer_endpoints.get(peer_id, []) + endpoints: list[str] = [] + seen_endpoints: set[str] = set() + if not endpoints: + stored_endpoints = list(im.peer_endpoints.get(peer_id, []) or []) + discovered_endpoints: list[str] = [] + get_discovered = getattr(p2p_manager, '_get_discovered_peer_endpoints', None) + if callable(get_discovered): + try: + discovered_endpoints = list(get_discovered(peer_id) or []) + except Exception: + discovered_endpoints = [] + + from ..network.invite import canonicalize_invite_endpoint, parse_invite_endpoint + import ipaddress + + def _endpoint_priority(endpoint: str) -> tuple[int, str]: + parsed = parse_invite_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + try: + ip = ipaddress.ip_address(text) + except ValueError: + return (0 if scheme == 'wss' else 1, endpoint) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) + + for endpoint in discovered_endpoints: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + endpoints.append(canon) + + stored_canonical = [] + for endpoint in stored_endpoints: + canon = canonicalize_invite_endpoint(endpoint) + if not canon or canon in seen_endpoints: + continue + seen_endpoints.add(canon) + stored_canonical.append(canon) + endpoints.extend(sorted(stored_canonical, key=_endpoint_priority)) + if not endpoints: return jsonify({'error': 'No known endpoints for this peer'}), 400 diff --git a/canopy/network/connection.py b/canopy/network/connection.py index ba54fd9b..c9840538 100644 --- a/canopy/network/connection.py +++ b/canopy/network/connection.py @@ -21,6 +21,22 @@ logger = logging.getLogger('canopy.network.connection') + +class _ExpectedWebSocketServerNoiseFilter(logging.Filter): + """Suppress noisy early-disconnect errors from the websockets server logger.""" + + def filter(self, record: logging.LogRecord) -> bool: + if record.levelno < logging.ERROR: + return True + if record.getMessage() != "opening handshake failed": + return True + exc = record.exc_info[1] if record.exc_info else None + return not isinstance(exc, websockets.exceptions.ConnectionClosedError) + + +_ws_server_logger = logging.getLogger('canopy.network.connection.websockets_server') +_ws_server_logger.addFilter(_ExpectedWebSocketServerNoiseFilter()) + _EXPECTED_CONNECT_ERRNOS = {51, 61, 64, 65, 110, 111, 113} @@ -156,6 +172,7 @@ class PeerConnection: canopy_version: Optional[str] = None protocol_version: Optional[int] = None endpoint_uri: Optional[str] = None + advertised_endpoints: List[str] = field(default_factory=list) failure_reason: Optional[str] = None failure_detail: Optional[str] = None _send_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) @@ -190,6 +207,7 @@ def __init__(self, local_peer_id: str, identity_manager: Any, tls_key_path: Optional[str] = None, enable_tls: bool = False, handshake_capabilities: Optional[List[str]] = None, + advertised_endpoints_provider: Optional[Callable[[], List[str]]] = None, canopy_version: str = "0.1.0", protocol_version: int = 1, reject_protocol_mismatch: bool = False): @@ -214,6 +232,7 @@ def __init__(self, local_peer_id: str, identity_manager: Any, cap for cap in (str(item).strip() for item in base_capabilities) if cap ] or ['chat', 'files', 'voice'] + self.advertised_endpoints_provider = advertised_endpoints_provider self.local_canopy_version = str(canopy_version or '0.1.0').strip() or '0.1.0' self.local_protocol_version = self._coerce_protocol_version(protocol_version, default=1) self.reject_protocol_mismatch = bool(reject_protocol_mismatch) @@ -451,7 +470,40 @@ def _signed_optional_handshake_fields(payload: Dict[str, Any]) -> Dict[str, Any] extra['canopy_version'] = payload.get('canopy_version') if 'protocol_version' in payload: extra['protocol_version'] = payload.get('protocol_version') + if 'advertised_endpoints' in payload: + extra['advertised_endpoints'] = payload.get('advertised_endpoints') return extra + + @staticmethod + def _normalize_endpoint_payload(raw: Any) -> List[str]: + """Normalize a handshake endpoint payload into a trimmed string list.""" + if isinstance(raw, str): + raw_values = [raw] + elif isinstance(raw, (list, tuple, set)): + raw_values = list(raw) + else: + raw_values = [] + + out: List[str] = [] + seen = set() + for endpoint in raw_values: + text = str(endpoint or '').strip() + if not text or text in seen: + continue + seen.add(text) + out.append(text) + return out + + def _get_advertised_endpoints(self) -> List[str]: + """Return the local node's self-advertised dial-back endpoints.""" + provider = self.advertised_endpoints_provider + if not callable(provider): + return [] + try: + return self._normalize_endpoint_payload(provider() or []) + except Exception as exc: + logger.debug("Could not compute handshake advertised_endpoints: %s", exc) + return [] async def start(self) -> None: """Start connection manager and WebSocket server.""" @@ -473,6 +525,7 @@ async def start(self) -> None: ping_timeout=None, max_size=20 * 1024 * 1024, # 20MB — allow P2P image transfer compression="deflate", # permessage-deflate — ~40-70% savings on JSON frames + logger=_ws_server_logger, ) if self.enable_tls and self._server_ssl: serve_kwargs['ssl'] = self._server_ssl @@ -566,7 +619,7 @@ async def connect_to_peer(self, peer_id: str, address: str, port: int, scheme: O if normalized_scheme not in ('ws', 'wss'): normalized_scheme = '' - logger.info(f"Connecting to peer {peer_id} at {address}:{port}...") + logger.debug(f"Connecting to peer {peer_id} at {address}:{port}...") # Create connection object connection = PeerConnection( @@ -645,7 +698,7 @@ async def connect_to_peer(self, peer_id: str, address: str, port: int, scheme: O except (TimeoutError, asyncio.TimeoutError): # Expected when trying stale/unreachable addresses; we try other addresses or retry - logger.info( + logger.debug( f"Connection to {peer_id} at {address}:{port} timed out " "(will try other addresses or retry)" ) @@ -681,7 +734,8 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] websocket: WebSocket connection path: Request path (provided by some websockets versions) """ - logger.info(f"Incoming connection from {websocket.remote_address}") + remote_address = getattr(websocket, 'remote_address', None) + logger.debug(f"Incoming connection from {remote_address}") try: # First message should be handshake with peer ID and signature @@ -690,7 +744,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] peer_id = handshake_data.get('peer_id') if not peer_id: - logger.warning("Handshake missing peer_id") + logger.debug("Handshake missing peer_id from %s", remote_address) await websocket.close() return @@ -777,6 +831,9 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] handshake_data.get('capabilities', []) ) } + connection.advertised_endpoints = self._normalize_endpoint_payload( + handshake_data.get('advertised_endpoints', []) + ) remote_protocol = connection.protocol_version or 1 remote_canopy = str(connection.canopy_version or '0.1.0') @@ -822,6 +879,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] 'canopy_version': connection.canopy_version, 'protocol_version': connection.protocol_version, 'capabilities': list(connection.capabilities or {}), + 'advertised_endpoints': list(connection.advertised_endpoints or []), } self.on_peer_authenticated(peer_id, peer_meta) except TypeError: @@ -836,7 +894,7 @@ async def _handle_incoming_connection(self, websocket: Any, path: Optional[str] await websocket.close() except asyncio.TimeoutError: - logger.warning("Handshake timeout") + logger.debug("Handshake timeout from %s", remote_address) try: await websocket.close() except Exception: @@ -893,6 +951,7 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: **signed_payload, 'canopy_version': self.local_canopy_version, 'protocol_version': self.local_protocol_version, + 'advertised_endpoints': self._get_advertised_endpoints(), 'signature': signature.hex() } @@ -1010,6 +1069,9 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: response.get('capabilities', []) ) } + connection.advertised_endpoints = self._normalize_endpoint_payload( + response.get('advertised_endpoints', []) + ) remote_protocol = connection.protocol_version or 1 remote_canopy = str(connection.canopy_version or '0.1.0') @@ -1043,8 +1105,8 @@ async def _perform_handshake(self, connection: PeerConnection) -> bool: return True except Exception as e: - if isinstance(e, websockets.exceptions.ConnectionClosedOK): - logger.info(f"Handshake closed cleanly with {connection.peer_id}: {e}") + if isinstance(e, websockets.exceptions.ConnectionClosed): + logger.debug(f"Handshake with {connection.peer_id} closed by remote: {e}") else: logger.error(f"Handshake failed: {e}", exc_info=True) connection.failure_reason = type(e).__name__ @@ -1089,6 +1151,7 @@ async def _complete_handshake(self, connection: PeerConnection, **signed_payload, 'canopy_version': self.local_canopy_version, 'protocol_version': self.local_protocol_version, + 'advertised_endpoints': self._get_advertised_endpoints(), 'signature': signature.hex() } diff --git a/canopy/network/manager.py b/canopy/network/manager.py index abb09ba3..b4c7d235 100644 --- a/canopy/network/manager.py +++ b/canopy/network/manager.py @@ -1033,6 +1033,7 @@ async def _startup(self) -> None: tls_cert_path=getattr(network_config, 'tls_cert_path', None), tls_key_path=getattr(network_config, 'tls_key_path', None), handshake_capabilities=self._local_capabilities, + advertised_endpoints_provider=self._get_local_advertised_endpoints, canopy_version=self.local_canopy_version, protocol_version=self.local_protocol_version, reject_protocol_mismatch=bool( @@ -1345,6 +1346,23 @@ def _get_advertisable_peer_endpoints(self, peer_id: str) -> list[str]: return stored return self._get_discovered_peer_endpoints(peer_id) + def _get_local_advertised_endpoints(self) -> list[str]: + """Return local endpoints safe to advertise during handshake.""" + if not self.identity_manager or not self.identity_manager.local_identity: + return [] + try: + from .invite import generate_invite + + mesh_port = int(getattr(self.config.network, 'mesh_port', 0) or 0) + if mesh_port <= 0: + return [] + invite = generate_invite(self.identity_manager, mesh_port) + local_peer_id = self.local_identity.peer_id if self.local_identity else '' + return self._sanitize_endpoints(local_peer_id, list(invite.endpoints or [])) + except Exception as exc: + logger.debug("Could not compute local advertised endpoints: %s", exc) + return [] + @staticmethod def _merge_endpoint_lists(*endpoint_groups: list[str]) -> list[str]: """Merge endpoint lists while preserving order and removing duplicates.""" @@ -1358,6 +1376,33 @@ def _merge_endpoint_lists(*endpoint_groups: list[str]) -> list[str]: merged.append(endpoint) return merged + @staticmethod + def _endpoint_reconnect_priority(endpoint: str) -> tuple[int, str]: + """Prefer public/tunnel endpoints over stale private remembered ones.""" + parsed = P2PNetworkManager._parse_endpoint(endpoint) + if not parsed: + return (99, endpoint) + host, _, scheme = parsed + text = str(host or '').strip().lower() + if not text: + return (99, endpoint) + if text == 'localhost' or text.startswith('127.'): + return (3, endpoint) + + try: + import ipaddress + + ip = ipaddress.ip_address(text) + if ip.is_loopback or ip.is_unspecified: + return (3, endpoint) + if ip.is_private or ip.is_link_local: + return (2, endpoint) + return (0 if scheme == 'wss' else 1, endpoint) + except ValueError: + # Hostnames and tunnel domains are typically the best reconnect path + # once discovery is no longer giving us a live LAN address. + return (0 if scheme == 'wss' else 1, endpoint) + def _get_connectable_peer_endpoints(self, peer_id: str, prefer_discovered: bool = True) -> list[str]: """Return the best available endpoints for dialing a peer. @@ -1371,6 +1416,7 @@ def _get_connectable_peer_endpoints(self, peer_id: str, prefer_discovered: bool if stored != self.identity_manager.peer_endpoints.get(peer_id, []): self.identity_manager.peer_endpoints[peer_id] = stored self.identity_manager._save_known_peers() + stored = sorted(stored, key=self._endpoint_reconnect_priority) discovered = self._get_discovered_peer_endpoints(peer_id) if discovered: self._remember_discovered_peer_endpoints(peer_id) @@ -1402,6 +1448,67 @@ def _remember_discovered_peer_endpoints(self, peer_id: str) -> list[str]: ) return endpoints + def _remember_peer_advertised_endpoints( + self, + peer_id: str, + endpoints: list[str], + *, + source: str = 'peer_advertised', + ) -> list[str]: + """Persist explicit dial-back endpoints announced by an authenticated peer.""" + clean_peer_id = str(peer_id or '').strip() + if not clean_peer_id: + return [] + sanitized = self._sanitize_endpoints(clean_peer_id, endpoints or []) + if not sanitized: + return [] + + existing = list(self.identity_manager.peer_endpoints.get(clean_peer_id, []) or []) + merged = self._merge_endpoint_lists(existing, sanitized) + if merged != existing: + self.identity_manager.peer_endpoints[clean_peer_id] = merged + self.identity_manager._save_known_peers() + logger.info( + "Recorded %d advertised endpoint(s) for %s via %s", + len(sanitized), + clean_peer_id, + source, + ) + + introduced = self._introduced_peers.get(clean_peer_id) + if isinstance(introduced, dict): + prior_sources = list(introduced.get('endpoint_sources') or []) + if source and source not in prior_sources: + prior_sources.append(source) + introduced['endpoint_sources'] = prior_sources + introduced['endpoint_provenance'] = source + introduced['endpoints'] = self._merge_endpoint_lists( + list(introduced.get('endpoints') or []), + sanitized, + ) + return sanitized + + def _remember_live_peer_endpoints(self, peer_id: str) -> list[str]: + """Persist reusable endpoints learned from the authenticated live connection.""" + if not self.connection_manager or not peer_id: + return [] + conn = self.connection_manager.get_connection(peer_id) + if not conn: + return [] + + advertised = list(getattr(conn, 'advertised_endpoints', []) or []) + remembered = self._remember_peer_advertised_endpoints(peer_id, advertised) + if remembered: + return remembered + + introduced = self._introduced_peers.get(peer_id) if hasattr(self, '_introduced_peers') else None + if isinstance(introduced, dict) and not introduced.get('endpoints'): + logger.info( + "Peer %s is connected but has no reusable advertised endpoints; introductions may require broker fallback", + peer_id, + ) + return [] + def _record_endpoint_result( self, peer_id: str, @@ -1872,6 +1979,14 @@ def _on_incoming_peer_authenticated(self, peer_id: str, peer_meta: Optional[Dict status='connected', detail='Incoming connection authenticated', ) + if isinstance(peer_meta, dict): + try: + self._remember_peer_advertised_endpoints( + peer_id, + list(peer_meta.get('advertised_endpoints') or []), + ) + except Exception: + pass # Do not invent reconnect endpoints from socket origin addresses. Only # persist discovery-derived endpoints, which are authoritative enough # to survive reconnects and peer announcements. @@ -2029,6 +2144,7 @@ async def _run_post_connect_sync_impl(self, peer_id: str) -> None: # Only cancel reconnect once the current connection has survived the # settle window; otherwise a brief flap can strand the peer. self._cancel_reconnect(peer_id) + self._remember_live_peer_endpoints(peer_id) # Notify application layer if self.on_peer_connected: @@ -2214,6 +2330,12 @@ async def _send_peer_announcement_to(self, peer_id: str) -> None: if not identity: continue endpoints = self._get_advertisable_peer_endpoints(pid) + if not endpoints: + logger.info( + "Peer %s is connected but has no re-advertisable endpoints while announcing to %s; broker fallback may be required", + pid, + peer_id, + ) device_profile = None if self.get_peer_device_profile: try: @@ -2256,6 +2378,11 @@ async def _announce_new_peer_to_others(self, new_peer_id: str) -> None: if not identity: return endpoints = self._get_advertisable_peer_endpoints(new_peer_id) + if not endpoints: + logger.info( + "New peer %s has no re-advertisable endpoints; downstream peers may need broker fallback", + new_peer_id, + ) device_profile = None if self.get_peer_device_profile: try: @@ -2389,7 +2516,99 @@ def get_introduced_peers(self) -> list: """Return the list of peers introduced by our contacts.""" if not hasattr(self, '_introduced_peers'): self._introduced_peers = {} - return list(self._introduced_peers.values()) + rows: list[dict[str, Any]] = [] + for peer_id, record in self._introduced_peers.items(): + if not isinstance(record, dict): + continue + introduced_endpoints = self._sanitize_endpoints( + peer_id, + list(record.get('endpoints') or []), + ) + stored_endpoints = self._sanitize_endpoints( + peer_id, + self.identity_manager.peer_endpoints.get(peer_id, []), + ) + discovered_endpoints = self._get_discovered_peer_endpoints(peer_id) + merged_endpoints = self._merge_endpoint_lists( + discovered_endpoints, + introduced_endpoints, + stored_endpoints, + ) + endpoint_sources: list[str] = [] + if discovered_endpoints: + endpoint_sources.append('discovered') + if introduced_endpoints: + endpoint_sources.append('introduced') + if stored_endpoints: + endpoint_sources.append('stored') + for source in list(record.get('endpoint_sources') or []): + text = str(source or '').strip() + if text and text not in endpoint_sources: + endpoint_sources.append(text) + + broker_candidates = self.get_introduced_peer_broker_candidates(peer_id) + connect_strategy = 'direct_or_broker' + if not merged_endpoints: + connect_strategy = 'broker_only' if broker_candidates else 'unreachable' + + row = dict(record) + row['peer_id'] = peer_id + row['endpoints'] = merged_endpoints + row['endpoint_count'] = len(merged_endpoints) + row['endpoint_sources'] = endpoint_sources or ['none'] + row['connect_strategy'] = connect_strategy + row['broker_candidates'] = broker_candidates + rows.append(row) + return rows + + def get_introduced_peer_broker_candidates(self, peer_id: str) -> list[str]: + """Return currently reachable broker peers for a given introduced peer.""" + clean_peer_id = str(peer_id or '').strip() + if not clean_peer_id or not hasattr(self, '_introduced_peers'): + return [] + intro = self._introduced_peers.get(clean_peer_id) + if not isinstance(intro, dict): + return [] + + connected_peers: list[str] = [] + try: + connected_peers = list(self.get_connected_peers() or []) + except Exception: + connected_peers = [] + connected_set = set(connected_peers) + + local_peer_id = '' + try: + local_peer_id = self.get_peer_id() or '' + except Exception: + local_peer_id = '' + + introducers: list[str] = [] + introduced_via = intro.get('introduced_via', []) + if isinstance(introduced_via, list): + for pid in introduced_via: + text = str(pid or '').strip() + if text: + introducers.append(text) + introduced_by = str(intro.get('introduced_by') or '').strip() + if introduced_by: + introducers.append(introduced_by) + + broker_candidates: list[str] = [] + seen_brokers: set[str] = set() + + for pid in introducers: + if pid in connected_set and pid not in seen_brokers: + seen_brokers.add(pid) + broker_candidates.append(pid) + + for pid in connected_peers: + if not pid or pid == clean_peer_id or pid == local_peer_id or pid in seen_brokers: + continue + seen_brokers.add(pid) + broker_candidates.append(pid) + + return broker_candidates def get_peer_public_identity(self, peer_id: str) -> Dict[str, Any]: """Return a public-safe identity preview for a peer before trust.""" @@ -2688,15 +2907,17 @@ def send_broker_request(self, target_peer_id: str, 'x25519_public_key': base58.b58encode(local_id.x25519_public_key).decode(), } - # Gather our own endpoints - mesh_port = self.config.network.mesh_port - requester_endpoints = [] - try: - from .invite import get_local_ips - for ip in get_local_ips(): - requester_endpoints.append(f"{self.ws_scheme}://{ip}:{mesh_port}") - except Exception: - requester_endpoints.append(f"{self.ws_scheme}://0.0.0.0:{mesh_port}") + # Prefer the same public/tunnel endpoints we advertise in handshakes so + # brokered connect-backs do not regress to LAN-only addresses. + requester_endpoints = list(self._get_local_advertised_endpoints() or []) + if not requester_endpoints: + mesh_port = self.config.network.mesh_port + try: + from .invite import get_local_ips + for ip in get_local_ips(): + requester_endpoints.append(f"{self.ws_scheme}://{ip}:{mesh_port}") + except Exception: + requester_endpoints.append(f"{self.ws_scheme}://0.0.0.0:{mesh_port}") future = asyncio.run_coroutine_threadsafe( self.message_router.send_broker_request( @@ -2718,7 +2939,7 @@ def send_broker_request(self, target_peer_id: str, via_peer=via_peer_id, ) else: - logger.warning( + logger.info( f"Broker request via {via_peer_id} for {target_peer_id} " f"was not routed immediately" ) diff --git a/canopy/ui/templates/connect.html b/canopy/ui/templates/connect.html index e75a9e52..1ac6b914 100644 --- a/canopy/ui/templates/connect.html +++ b/canopy/ui/templates/connect.html @@ -570,7 +570,7 @@
These peers were introduced by your connected contacts. Click Connect to establish a direct link.
+These peers were introduced by your connected contacts. If a peer has no direct endpoint, Canopy will fall back to broker-assisted connection through the introducer.
|
- {% for ep in peer.endpoints %}
- {{ ep }}
- {% endfor %}
+ {% if peer.endpoints %}
+ {% for ep in peer.endpoints %}
+ {{ ep }}
+ {% endfor %}
+ {% if peer.endpoint_sources %}
+
+ {% for source in peer.endpoint_sources %}
+ {{ source }}
+ {% endfor %}
+
+ {% endif %}
+ {% else %}
+ No direct endpoint announced
+ {% if peer.broker_candidates %}
+ Broker path available via {{ peer.broker_candidates|length }} peer{{ '' if peer.broker_candidates|length == 1 else 's' }}
+ {% else %}
+ Needs a fresh invite or active broker
+ {% endif %}
+ {% endif %}
|
{% if ibydev and ibydev.display_name %}
@@ -626,7 +642,8 @@ Peers via Your Contacts | @@ -908,7 +925,7 @@ |