diff --git a/gossipsub-interop/Makefile b/gossipsub-interop/Makefile index 72c43d806..89d9d02ee 100644 --- a/gossipsub-interop/Makefile +++ b/gossipsub-interop/Makefile @@ -4,6 +4,7 @@ all: binaries binaries: cd go-libp2p && go build -linkshared -o gossipsub-bin cd rust-libp2p && cargo build + cd py-libp2p && chmod +x main.py # Clean all generated shadow simulation files clean: diff --git a/gossipsub-interop/experiment.py b/gossipsub-interop/experiment.py index 64135b3ca..b29531e58 100644 --- a/gossipsub-interop/experiment.py +++ b/gossipsub-interop/experiment.py @@ -137,6 +137,8 @@ def composition(preset_name: str) -> List[Binary]: "rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=100 ) ] + case "all-python": + return [Binary("py-libp2p/main.py", percent_of_nodes=100)] case "rust-and-go": return [ Binary( @@ -144,6 +146,26 @@ def composition(preset_name: str) -> List[Binary]: ), Binary("go-libp2p/gossipsub-bin", percent_of_nodes=50), ] + case "python-and-go": + return [ + Binary("py-libp2p/main.py", percent_of_nodes=50), + Binary("go-libp2p/gossipsub-bin", percent_of_nodes=50), + ] + case "python-and-rust": + return [ + Binary("py-libp2p/main.py", percent_of_nodes=50), + Binary( + "rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=50 + ), + ] + case "all-three": + return [ + Binary("py-libp2p/main.py", percent_of_nodes=34), + Binary("go-libp2p/gossipsub-bin", percent_of_nodes=33), + Binary( + "rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=33 + ), + ] raise ValueError(f"Unknown preset name: {preset_name}") diff --git a/gossipsub-interop/py-libp2p/main.py b/gossipsub-interop/py-libp2p/main.py new file mode 100755 index 000000000..a52191889 --- /dev/null +++ b/gossipsub-interop/py-libp2p/main.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +""" +Python implementation for GossipSub interoperability testing. + +This implementation follows the specification in test-specs/implementation.md +and provides compatibility with the existing Go and Rust implementations. +""" + +import argparse +import json +import logging +import socket +import struct +import sys +import time +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import trio +import multiaddr +from libp2p import new_host +from libp2p.crypto.ed25519 import create_new_key_pair +from libp2p.crypto.keys import KeyPair +from libp2p.abc import IHost +from libp2p.peer.id import ID as PeerID +from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.stream_muxer.yamux.yamux import PROTOCOL_ID as YAMUX_PROTOCOL_ID, Yamux +from libp2p.tools.async_service.trio_service import background_trio_service +from libp2p.custom_types import TProtocol + +# Configure logging to stderr (diagnostics) and stdout (structured output) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + stream=sys.stderr, +) +logger = logging.getLogger("py-libp2p-gossip") + +# Protocol constants +GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0") + + +def node_priv_key(node_id: int) -> KeyPair: + """ + Generate a deterministic ED25519 private key from node ID. + + This follows the specification in test-specs/implementation.md: + "Implementations MUST deterministically generate their ED25519 peer ID + from their node ID by using their little-endian encoded node ID as their ED25519 key." + """ + # Create a 32-byte seed with the node ID in little-endian format + seed = bytearray(32) + struct.pack_into(" str: + """ + Calculate message ID from message data. + + From the specification: + "Message ID is calculated by reading the first 8 bytes of the message and + interpreting them as a big endian encoded 64 bit unsigned integer." + """ + if len(data) < 8: + # Pad with zeros if data is less than 8 bytes + padded_data = data + b'\x00' * (8 - len(data)) + else: + padded_data = data[:8] + + # Interpret as big-endian uint64 and format as base 10 integer + message_id = struct.unpack(">Q", padded_data)[0] + return str(message_id) + + +def log_structured(msg: str, **kwargs) -> None: + """ + Log structured JSON to stdout as required by the specification. + + All STDOUT logs must include: + - time: The RFC3339 timestamp of the log entry + - msg: The message being logged + """ + log_entry = { + "time": datetime.now(timezone.utc).isoformat(), + "msg": msg, + **kwargs + } + print(json.dumps(log_entry), flush=True) + + +class ShadowConnector: + """Connector implementation for Shadow simulator environment.""" + + async def connect_to(self, host: IHost, node_id: int) -> None: + """Connect to another node by its node ID.""" + try: + # Resolve the hostname for the target node + hostname = f"node{node_id}" + addrs = socket.getaddrinfo(hostname, None) + if not addrs: + raise Exception(f"Failed to resolve hostname: {hostname}") + + ip_addr = addrs[0][4][0] + + # Generate the peer ID for the target node + target_key_pair = node_priv_key(node_id) + target_peer_id = PeerID.from_pubkey(target_key_pair.public_key) + + # Create multiaddr for the target peer + maddr_str = f"/ip4/{ip_addr}/tcp/9000/p2p/{target_peer_id}" + + # Parse and connect + peer_info = info_from_p2p_addr(maddr_str) + await host.connect(peer_info) + + logger.debug(f"Connected to node{node_id} at {maddr_str}") + + except Exception as e: + logger.error(f"Failed to connect to node{node_id}: {e}") + raise + + +class ExperimentParams: + """Container for experiment parameters loaded from JSON.""" + + def __init__(self, data: Dict[str, Any]): + self.script = data.get("script", []) + + +class GossipSubInteropNode: + """Main node implementation for GossipSub interoperability testing.""" + + def __init__(self, node_id: int, params: ExperimentParams): + self.node_id = node_id + self.params = params + self.start_time = time.time() + self.connector = ShadowConnector() + self.host: Optional[IHost] = None + self.pubsub: Optional[Pubsub] = None + self.gossipsub: Optional[GossipSub] = None + self.subscriptions: Dict[str, Any] = {} + self.topic_validation_delays: Dict[str, float] = {} + + async def setup_host(self) -> None: + """Set up the libp2p host with gossipsub.""" + # Generate deterministic key pair + key_pair = node_priv_key(self.node_id) + peer_id = PeerID.from_pubkey(key_pair.public_key) + + # Log peer ID as required by specification + log_structured( + "PeerID", + id=str(peer_id), + node_id=self.node_id + ) + + # Create host + self.host = new_host( + key_pair=key_pair, + muxer_opt={YAMUX_PROTOCOL_ID: Yamux}, + listen_addrs=[multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/9000")] + ) + + # Create gossipsub with default parameters + # These can be overridden by InitGossipSub instructions + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_PROTOCOL_ID], + degree=6, # D + degree_low=4, # Dlo + degree_high=12, # Dhi + time_to_live=30, # FanoutTTL in seconds + gossip_window=3, # HistoryLength + gossip_history=5, # HistoryGossip + heartbeat_initial_delay=0.1, # HeartbeatInitialDelay in seconds + heartbeat_interval=1.0, # HeartbeatInterval in seconds + ) + + # Create pubsub + self.pubsub = Pubsub(self.host, self.gossipsub) + + logger.info(f"Node {self.node_id} initialized with peer ID: {peer_id}") + + async def execute_instruction(self, instruction: Dict[str, Any]) -> None: + """Execute a single script instruction.""" + instr_type = instruction.get("type") + + if instr_type == "connect": + await self._handle_connect(instruction) + elif instr_type == "subscribeToTopic": + await self._handle_subscribe(instruction) + elif instr_type == "publish": + await self._handle_publish(instruction) + elif instr_type == "waitUntil": + await self._handle_wait_until(instruction) + elif instr_type == "setTopicValidationDelay": + await self._handle_set_validation_delay(instruction) + elif instr_type == "initGossipSub": + await self._handle_init_gossipsub(instruction) + elif instr_type == "ifNodeIDEquals": + await self._handle_if_node_id_equals(instruction) + else: + logger.warning(f"Unknown instruction type: {instr_type}") + + async def _handle_connect(self, instruction: Dict[str, Any]) -> None: + """Handle connect instruction.""" + connect_to = instruction.get("connectTo", []) + for target_node_id in connect_to: + try: + await self.connector.connect_to(self.host, target_node_id) + except Exception as e: + logger.error(f"Failed to connect to node {target_node_id}: {e}") + + async def _handle_subscribe(self, instruction: Dict[str, Any]) -> None: + """Handle subscribeToTopic instruction.""" + topic_id = instruction.get("topicID") + if topic_id and self.pubsub: + subscription = await self.pubsub.subscribe(topic_id) + self.subscriptions[topic_id] = subscription + logger.info(f"Subscribed to topic: {topic_id}") + + # Start message receiving task for this topic in the nursery + async def receive_messages(): + while True: + try: + message = await subscription.get() + # Log received message as required by specification + message_id = calc_message_id(message.data) + log_structured( + "Received Message", + id=message_id, + from_=str(message.from_id) if message.from_id else "", + topic=topic_id + ) + except Exception as e: + logger.error(f"Error receiving message on topic {topic_id}: {e}") + await trio.sleep(0.1) + + # Store the receive task function to start it later in the nursery + self.subscriptions[f"{topic_id}_receiver"] = receive_messages + + async def _handle_publish(self, instruction: Dict[str, Any]) -> None: + """Handle publish instruction.""" + message_id = instruction.get("messageID") + topic_id = instruction.get("topicID") + message_size_bytes = instruction.get("messageSizeBytes", 0) + + if self.pubsub and topic_id is not None: + # Create message data with the message ID in the first 8 bytes + message_data = bytearray(message_size_bytes) + if message_size_bytes >= 8: + struct.pack_into(">Q", message_data, 0, message_id) + + # Apply topic validation delay if set + if topic_id in self.topic_validation_delays: + delay = self.topic_validation_delays[topic_id] + await trio.sleep(delay) + + await self.pubsub.publish(topic_id, bytes(message_data)) + logger.info(f"Published message {message_id} to topic {topic_id}") + + async def _handle_wait_until(self, instruction: Dict[str, Any]) -> None: + """Handle waitUntil instruction.""" + elapsed_seconds = instruction.get("elapsedSeconds", 0) + current_elapsed = time.time() - self.start_time + + if elapsed_seconds > current_elapsed: + wait_time = elapsed_seconds - current_elapsed + logger.debug(f"Waiting {wait_time:.2f} seconds until {elapsed_seconds}s elapsed") + await trio.sleep(wait_time) + + async def _handle_set_validation_delay(self, instruction: Dict[str, Any]) -> None: + """Handle setTopicValidationDelay instruction.""" + topic_id = instruction.get("topicID") + delay_seconds = instruction.get("delaySeconds", 0.0) + + if topic_id: + self.topic_validation_delays[topic_id] = delay_seconds + logger.info(f"Set validation delay for topic {topic_id}: {delay_seconds}s") + + async def _handle_init_gossipsub(self, instruction: Dict[str, Any]) -> None: + """Handle initGossipSub instruction.""" + # This would reconfigure gossipsub parameters + # For now, we log that we received the instruction + gossip_params = instruction.get("gossipSubParams", {}) + logger.info(f"InitGossipSub instruction received with params: {gossip_params}") + # TODO: Apply the gossipsub parameters to reconfigure the router + + async def _handle_if_node_id_equals(self, instruction: Dict[str, Any]) -> None: + """Handle ifNodeIDEquals instruction.""" + target_node_id = instruction.get("nodeID") + inner_instruction = instruction.get("instruction") + + if target_node_id == self.node_id and inner_instruction: + await self.execute_instruction(inner_instruction) + + async def run_experiment(self) -> None: + """Run the complete experiment.""" + await self.setup_host() + + async with trio.open_nursery() as nursery: + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + logger.info("Pubsub and GossipSub services started") + + # Start message receiving tasks for any existing subscriptions + for key, receiver in self.subscriptions.items(): + if key.endswith("_receiver") and callable(receiver): + nursery.start_soon(receiver) + + # Execute all script instructions + for instruction in self.params.script: + await self.execute_instruction(instruction) + + # Start any new receiver tasks that were created + for key, receiver in list(self.subscriptions.items()): + if key.endswith("_receiver") and callable(receiver): + nursery.start_soon(receiver) + # Remove from dict to avoid starting again + del self.subscriptions[key] + + # Keep running to handle ongoing message delivery + logger.info("All instructions executed, keeping node alive for message delivery") + await trio.sleep(3600) # Run for up to 1 hour + + +async def main() -> None: + """Main entry point.""" + parser = argparse.ArgumentParser(description="Python GossipSub interop test node") + parser.add_argument("--params", required=True, help="Path to params.json file") + + args = parser.parse_args() + + # Read experiment parameters + try: + with open(args.params, 'r') as f: + params_data = json.load(f) + params = ExperimentParams(params_data) + except Exception as e: + logger.error(f"Failed to load params file {args.params}: {e}") + sys.exit(1) + + # Get node ID from hostname + try: + hostname = socket.gethostname() + node_id = int(hostname.replace("node", "")) + except Exception as e: + # For testing outside Shadow, use a default node_id + node_id = 1 + logger.info(f"Using default node ID (not in Shadow): {node_id}") + + # Create and run the node + node = GossipSubInteropNode(node_id, params) + + try: + # Run the experiment (we're already inside trio.run from main) + await node.run_experiment() + except KeyboardInterrupt: + logger.info("Node terminated by user") + except Exception as e: + logger.error(f"Node failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + trio.run(main) diff --git a/gossipsub-interop/py-libp2p/pyproject.toml b/gossipsub-interop/py-libp2p/pyproject.toml new file mode 100644 index 000000000..584608b77 --- /dev/null +++ b/gossipsub-interop/py-libp2p/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "py-libp2p-gossip-interop" +version = "0.1.0" +description = "Python implementation for GossipSub interoperability testing" +authors = [ + {name = "py-libp2p contributors"} +] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "libp2p >= 0.4.0", + "trio >= 0.22.0", + "multiaddr >= 0.0.9", + "base58 >= 2.1.0", +] + +[project.scripts] +py-libp2p-gossip = "main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] diff --git a/gossipsub-interop/py-libp2p/test_params.json b/gossipsub-interop/py-libp2p/test_params.json new file mode 100644 index 000000000..fd2e57c5e --- /dev/null +++ b/gossipsub-interop/py-libp2p/test_params.json @@ -0,0 +1,38 @@ +{ + "node_id": 1, + "gossipsub_params": { + "D": 6, + "D_low": 4, + "D_high": 12, + "D_lazy": 6, + "heartbeat_interval": 1000, + "fanout_ttl": 60000, + "mcache_len": 5, + "mcache_gossip": 3, + "seen_ttl": 120000 + }, + "script": [ + { + "action": "InitGossipSub", + "gossipsubParams": { + "D": 6, + "D_low": 4, + "D_high": 12, + "D_lazy": 6, + "heartbeat_interval": 1000, + "fanout_ttl": 60000, + "mcache_len": 5, + "mcache_gossip": 3, + "seen_ttl": 120000 + } + }, + { + "action": "SubscribeToTopic", + "topicID": "test-topic" + }, + { + "action": "WaitUntil", + "duration": 5000 + } + ] +} diff --git a/gossipsub-interop/run-docker.sh b/gossipsub-interop/run-docker.sh new file mode 100755 index 000000000..f582bb513 --- /dev/null +++ b/gossipsub-interop/run-docker.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +# Script to run gossipsub interop tests using Docker +# This script handles the Docker setup and provides easy access to the Shadow simulator + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Function to print colored output +print_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + print_error "Docker is not running. Please start Docker and try again." + exit 1 +fi + +# Check if we're on macOS and warn about Shadow compatibility +if [[ "$OSTYPE" == "darwin"* ]]; then + print_warning "Running on macOS. Shadow simulator only supports Linux, so we're using Docker." +fi + +# Build the Docker image if it doesn't exist or if --build is passed +if [[ "$1" == "--build" ]] || ! docker images | grep -q gossipsub-interop; then + print_info "Building Docker image for gossipsub interop tests..." + docker-compose build + shift # Remove --build from arguments +fi + +# If no arguments provided, show help +if [ $# -eq 0 ]; then + print_info "Running gossipsub interop tests with Docker" + print_info "Usage: $0 [--build] [gossipsub-interop-arguments]" + print_info "Examples:" + print_info " $0 --help # Show help" + print_info " $0 --node_count 100 --composition rust-and-go --scenario subnet-blob-msg" + print_info " $0 --build --node_count 50 --composition all-go --scenario simple-fanout" + echo + print_info "Running help command..." + docker-compose run --rm gossipsub-interop --help + exit 0 +fi + +# Run the gossipsub interop tests with the provided arguments +print_info "Running gossipsub interop tests with arguments: $*" +docker-compose run --rm gossipsub-interop "$@" + +# Check if output directory was created and show results +if [ -d "*.data" ]; then + print_info "Test completed! Results are available in the .data directory" + print_info "You can view the plots and analysis in the plots/ subdirectory" +else + print_warning "No output directory found. The test may not have completed successfully." +fi