From 2838d7da3a5a8dc8745b52e05b8c93d458458676 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Tue, 24 Jun 2025 18:19:40 +0500 Subject: [PATCH 1/2] feat: enhance cascade download functionality with concurrent processing and improved error handling --- Makefile | 9 +- sdk/action/client.go | 16 +-- sdk/adapters/supernodeservice/adapter.go | 28 ++-- sdk/task/download.go | 156 +++++++++++++++++++---- sdk/task/helpers.go | 18 +++ sdk/task/manager.go | 29 +++-- tests/scripts/install-lumera.sh | 55 ++++---- tests/system/e2e_cascade_test.go | 53 ++++++-- 8 files changed, 272 insertions(+), 92 deletions(-) diff --git a/Makefile b/Makefile index 0d20d887..fe4536d9 100644 --- a/Makefile +++ b/Makefile @@ -61,11 +61,16 @@ CONFIG_FILE3=tests/system/config.test-3.yml SETUP_SCRIPT=tests/scripts/setup-supernodes.sh # Install Lumera +# Optional: specify lumera binary path to skip download +LUMERAD_BINARY ?= +# Optional: specify installation mode (latest-release, latest-tag, or vX.Y.Z) +INSTALL_MODE ?=latest-tag + install-lumera: @echo "Installing Lumera..." @chmod +x tests/scripts/install-lumera.sh - @sudo tests/scripts/install-lumera.sh latest-tag - + @sudo LUMERAD_BINARY="$(LUMERAD_BINARY)" tests/scripts/install-lumera.sh $(INSTALL_MODE) + @echo "PtTDUHythfRfXHh63yzyiGDid4TZj2P76Zd,18749999981413" > ~/claims.csv # Setup supernode environments setup-supernodes: @echo "Setting up all supernode environments..." diff --git a/sdk/action/client.go b/sdk/action/client.go index ca19ec57..b6bf268c 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -16,16 +16,15 @@ import ( // //go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go type Client interface { - // - signature: Base64-encoded cryptographic signature of the file's data hash (blake3) - // 1- hash(blake3) > 2- sign > 3- base64 - // The signature must be created by the same account that created the Lumera action. - // It must be a digital signature of the data hash found in the action's CASCADE metadata. + // StartCascade initiates a cascade operation with file path, action ID, and signature + // signature: Base64-encoded signature of file's blake3 hash by action creator StartCascade(ctx context.Context, filePath string, actionID string, signature string) (string, error) DeleteTask(ctx context.Context, taskID string) error GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error SubscribeToAllEvents(ctx context.Context, handler event.Handler) error - DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error) + // DownloadCascade downloads cascade to outputDir, filename determined by action ID + DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error) } // ClientImpl implements the Client interface @@ -130,16 +129,13 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han return nil } -func (c *ClientImpl) DownloadCascade( - ctx context.Context, - actionID, outputPath string, -) (string, error) { +func (c *ClientImpl) DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error) { if actionID == "" { return "", fmt.Errorf("actionID is empty") } - taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath) + taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputDir) if err != nil { return "", fmt.Errorf("create download task: %w", err) } diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 7b4cc7a4..01bda399 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path/filepath" "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/pkg/net" @@ -178,16 +179,20 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( ActionId: in.ActionID, }, opts...) if err != nil { - a.logger.Error(ctx, "failed to create download stream", - "action_id", in.ActionID, "error", err) + a.logger.Error(ctx, "failed to create download stream", "action_id", in.ActionID, "error", err) return nil, err } // 2. Prepare destination file + // Create directory structure if it doesn't exist + if err := os.MkdirAll(filepath.Dir(in.OutputPath), 0755); err != nil { + a.logger.Error(ctx, "failed to create output directory", "path", filepath.Dir(in.OutputPath), "error", err) + return nil, fmt.Errorf("create output directory: %w", err) + } + outFile, err := os.Create(in.OutputPath) if err != nil { - a.logger.Error(ctx, "failed to create output file", - "path", in.OutputPath, "error", err) + a.logger.Error(ctx, "failed to create output file", "path", in.OutputPath, "error", err) return nil, fmt.Errorf("create output file: %w", err) } defer outFile.Close() @@ -211,10 +216,7 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( // 3a. Progress / event message case *cascade.DownloadResponse_Event: - a.logger.Info(ctx, "supernode event", - "event_type", x.Event.EventType, - "message", x.Event.Message, - "action_id", in.ActionID) + a.logger.Info(ctx, "supernode event", "event_type", x.Event.EventType, "message", x.Event.Message, "action_id", in.ActionID) if in.EventLogger != nil { in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{ @@ -237,17 +239,11 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( bytesWritten += int64(len(data)) chunkIndex++ - a.logger.Debug(ctx, "received chunk", - "chunk_index", chunkIndex, - "chunk_size", len(data), - "bytes_written", bytesWritten) + a.logger.Debug(ctx, "received chunk", "chunk_index", chunkIndex, "chunk_size", len(data), "bytes_written", bytesWritten) } } - a.logger.Info(ctx, "download complete", - "bytes_written", bytesWritten, - "path", in.OutputPath, - "action_id", in.ActionID) + a.logger.Info(ctx, "download complete", "bytes_written", bytesWritten, "path", in.OutputPath, "action_id", in.ActionID) return &CascadeSupernodeDownloadResponse{ Success: true, diff --git a/sdk/task/download.go b/sdk/task/download.go index 36b99c44..147ea9f0 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -65,33 +65,42 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern OutputPath: t.outputPath, } - var lastErr error - for idx, sn := range supernodes { - t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - event.KeyIteration: idx + 1, - }) + // Process supernodes in pairs + var allErrors []error + for i := 0; i < len(supernodes); i += 2 { + // Determine how many supernodes to try in this batch (1 or 2) + batchSize := 2 + if i+1 >= len(supernodes) { + batchSize = 1 + } - if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil { - lastErr = err - t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - event.KeyIteration: idx + 1, - event.KeyError: err.Error(), + t.logger.Info(ctx, "attempting concurrent download from supernode batch", "batch_start", i, "batch_size", batchSize) + + // Try downloading from this batch concurrently + result, batchErrors := t.attemptConcurrentDownload(ctx, supernodes[i:i+batchSize], clientFactory, req, i) + + if result != nil { + // Success! Log and return + t.LogEvent(ctx, event.SDKDownloadSuccessful, "download successful", event.EventData{ + event.KeySupernode: result.SupernodeEndpoint, + event.KeySupernodeAddress: result.SupernodeAddress, + event.KeyIteration: result.Iteration, }) - continue + return nil } - t.LogEvent(ctx, event.SDKDownloadSuccessful, "download successful", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - event.KeyIteration: idx + 1, - }) - return nil + // Both (or the single one) failed, collect errors + allErrors = append(allErrors, batchErrors...) + + // Log batch failure + t.logger.Warn(ctx, "download batch failed", "batch_start", i, "batch_size", batchSize, "errors", len(batchErrors)) + } + + // All attempts failed + if len(allErrors) > 0 { + return fmt.Errorf("failed to download from all super-nodes: %v", allErrors) } - return fmt.Errorf("failed to download from all super-nodes: %w", lastErr) + return fmt.Errorf("no supernodes available for download") } func (t *CascadeDownloadTask) attemptDownload( @@ -129,3 +138,106 @@ func (t *CascadeDownloadTask) attemptDownload( return nil } + +// downloadResult holds the result of a successful download attempt +type downloadResult struct { + SupernodeAddress string + SupernodeEndpoint string + Iteration int +} + +// attemptConcurrentDownload tries to download from multiple supernodes concurrently +// Returns the first successful result or all errors if all attempts fail +func (t *CascadeDownloadTask) attemptConcurrentDownload( + ctx context.Context, + batch lumera.Supernodes, + factory *net.ClientFactory, + req *supernodeservice.CascadeSupernodeDownloadRequest, + baseIteration int, +) (*downloadResult, []error) { + // Create a cancellable context for this batch + batchCtx, cancelBatch := context.WithCancel(ctx) + defer cancelBatch() + + // Channels for results + type attemptResult struct { + success *downloadResult + err error + idx int + } + resultCh := make(chan attemptResult, len(batch)) + + // Start concurrent download attempts + for idx, sn := range batch { + iteration := baseIteration + idx + 1 + + // Log download attempt + t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: iteration, + }) + + go func(sn lumera.Supernode, idx int, iter int) { + // Create a copy of the request for this goroutine + reqCopy := &supernodeservice.CascadeSupernodeDownloadRequest{ + ActionID: req.ActionID, + TaskID: req.TaskID, + OutputPath: req.OutputPath, + } + + err := t.attemptDownload(batchCtx, sn, factory, reqCopy) + if err != nil { + resultCh <- attemptResult{ + err: err, + idx: idx, + } + return + } + + resultCh <- attemptResult{ + success: &downloadResult{ + SupernodeAddress: sn.CosmosAddress, + SupernodeEndpoint: sn.GrpcEndpoint, + Iteration: iter, + }, + idx: idx, + } + }(sn, idx, iteration) + } + + // Collect results + var errors []error + for i := 0; i < len(batch); i++ { + select { + case result := <-resultCh: + if result.success != nil { + // Success! Cancel other attempts and return + cancelBatch() + // Drain remaining results to avoid goroutine leaks + go func() { + for j := i + 1; j < len(batch); j++ { + <-resultCh + } + }() + return result.success, nil + } + + // Log failure + sn := batch[result.idx] + t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: baseIteration + result.idx + 1, + event.KeyError: result.err.Error(), + }) + errors = append(errors, result.err) + + case <-ctx.Done(): + return nil, []error{ctx.Err()} + } + } + + // All attempts in this batch failed + return nil, errors +} diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go index 1646a326..44c9a662 100644 --- a/sdk/task/helpers.go +++ b/sdk/task/helpers.go @@ -4,6 +4,8 @@ import ( "context" "encoding/base64" "fmt" + "path/filepath" + "strings" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" ) @@ -99,3 +101,19 @@ func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID strin return action, nil } + +// Helper function to ensure output path has the correct filename +func ensureOutputPathWithFilename(outputPath, filename string) string { + // If outputPath is empty, just return the filename + if outputPath == "" { + return filename + } + + // Check if the path already ends with the filename + if strings.HasSuffix(outputPath, filename) { + return outputPath + } + + // Otherwise, append the filename to the path + return filepath.Join(outputPath, filename) +} diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 1f1f0a19..83bf39c4 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -3,6 +3,7 @@ package task import ( "context" "fmt" + "path" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/config" @@ -229,21 +230,30 @@ func (m *ManagerImpl) Close(ctx context.Context) { } } -func (m *ManagerImpl) CreateDownloadTask( - ctx context.Context, - actionID string, - outputPath string, -) (string, error) { - +func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, outputDir string) (string, error) { // First validate the action before creating the task action, err := m.validateDownloadAction(ctx, actionID) if err != nil { return "", err } + // Decode metadata to get the filename + metadata, err := m.lumeraClient.DecodeCascadeMetadata(ctx, action) + if err != nil { + return "", fmt.Errorf("failed to decode cascade metadata: %w", err) + } + + // Ensure we have a filename from metadata + if metadata.FileName == "" { + return "", fmt.Errorf("no filename found in cascade metadata") + } + + // Ensure the output path includes the correct filename + finalOutputPath := path.Join(outputDir, action.ID, metadata.FileName) + taskID := uuid.New().String()[:8] - m.logger.Debug(ctx, "Generated download task ID", "task_id", taskID) + m.logger.Debug(ctx, "Generated download task ID", "task_id", taskID, "final_output_path", finalOutputPath) baseTask := BaseTask{ TaskID: taskID, @@ -257,7 +267,8 @@ func (m *ManagerImpl) CreateDownloadTask( logger: m.logger, } - task := NewCascadeDownloadTask(baseTask, actionID, outputPath) + // Use the final output path with the correct filename + task := NewCascadeDownloadTask(baseTask, actionID, finalOutputPath) // Store task in cache m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID) @@ -275,6 +286,6 @@ func (m *ManagerImpl) CreateDownloadTask( } }() - m.logger.Info(ctx, "Download Cascade task created successfully", "taskID", taskID) + m.logger.Info(ctx, "Download Cascade task created successfully", "taskID", taskID, "outputPath", finalOutputPath) return taskID, nil } diff --git a/tests/scripts/install-lumera.sh b/tests/scripts/install-lumera.sh index 4c657e82..d3c5773c 100755 --- a/tests/scripts/install-lumera.sh +++ b/tests/scripts/install-lumera.sh @@ -5,17 +5,40 @@ set -e # Exit immediately if a command exits with a non-zero status # ./install-lumera.sh # uses latest release # ./install-lumera.sh latest-tag # uses latest tag from /tags # ./install-lumera.sh v1.1.0 # installs this specific version +# LUMERAD_BINARY=/path/to/binary ./install-lumera.sh # uses existing binary -# Support mode argument: 'latest-release' (default) or 'latest-tag' +install_binary() { + local binary_path="$1" + chmod +x "$binary_path" + sudo cp "$binary_path" /usr/local/bin/lumerad + + # Verify installation + if which lumerad > /dev/null; then + echo "Installed: $(lumerad version 2>/dev/null || echo "unknown version")" + else + echo "Installation failed" + exit 1 + fi +} + +# Check if binary path is provided via environment variable +if [ -n "$LUMERAD_BINARY" ]; then + if [ ! -f "$LUMERAD_BINARY" ]; then + echo "Binary not found: $LUMERAD_BINARY" + exit 1 + fi + install_binary "$LUMERAD_BINARY" + exit 0 +fi + +# Support mode argument: 'latest-release' (default), 'latest-tag', or specific version MODE="${1:-latest-release}" REPO="LumeraProtocol/lumera" GITHUB_API="https://api.github.com/repos/$REPO" -echo "Installation mode: $MODE" - +# Determine tag and download URL based on mode if [ "$MODE" == "latest-tag" ]; then - echo "Fetching latest tag from GitHub..." if command -v jq >/dev/null 2>&1; then TAG_NAME=$(curl -s "$GITHUB_API/tags" | jq -r '.[0].name') else @@ -28,7 +51,6 @@ elif [[ "$MODE" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then DOWNLOAD_URL="https://github.com/${REPO}/releases/download/${TAG_NAME}/lumera_${TAG_NAME}_linux_amd64.tar.gz" elif [ "$MODE" == "latest-release" ]; then - echo "Fetching latest release information..." RELEASE_INFO=$(curl -s -S -L "$GITHUB_API/releases/latest") # Extract tag name and download URL @@ -41,14 +63,15 @@ elif [ "$MODE" == "latest-release" ]; then fi else - echo "❌ Error: Invalid mode '$MODE'" + echo "Error: Invalid mode '$MODE'" echo "Usage: $0 [latest-release|latest-tag|vX.Y.Z]" + echo " or: LUMERAD_BINARY=/path/to/binary $0" exit 1 fi echo "Selected tag: $TAG_NAME" -echo "Download URL: $DOWNLOAD_URL" +# Validate that we have the required information if [ -z "$TAG_NAME" ] || [ -z "$DOWNLOAD_URL" ]; then echo "Error: Could not determine tag or download URL" exit 1 @@ -57,8 +80,8 @@ fi # Download and extract the release TEMP_DIR=$(mktemp -d) ORIG_DIR=$(pwd) -echo "Downloading Lumera from $DOWNLOAD_URL" curl -L --progress-bar "$DOWNLOAD_URL" -o "$TEMP_DIR/lumera.tar.gz" + cd "$TEMP_DIR" tar -xzf lumera.tar.gz rm lumera.tar.gz @@ -66,24 +89,13 @@ rm lumera.tar.gz # Install WASM library WASM_LIB=$(find . -type f -name "libwasmvm*.so" -print -quit) if [ -n "$WASM_LIB" ]; then - echo "Installing WASM library: $WASM_LIB" sudo cp "$WASM_LIB" /usr/lib/ fi # Find and install lumerad binary LUMERAD_PATH=$(find . -type f -name "lumerad" -print -quit) if [ -n "$LUMERAD_PATH" ]; then - echo "Installing lumerad binary from: $LUMERAD_PATH" - chmod +x "$LUMERAD_PATH" - sudo cp "$LUMERAD_PATH" /usr/local/bin/ - - # Verify installation - if which lumerad > /dev/null; then - echo "Installation successful. Lumerad version: $(lumerad version 2>/dev/null || echo "unknown")" - else - echo "Error: Lumerad installation failed" - exit 1 - fi + install_binary "$LUMERAD_PATH" else echo "Error: Could not find lumerad binary in the package" exit 1 @@ -91,5 +103,4 @@ fi # Clean up cd "$ORIG_DIR" -rm -rf "$TEMP_DIR" -echo "Lumera installation complete" \ No newline at end of file +rm -rf "$TEMP_DIR" \ No newline at end of file diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 6402c6ad..b1ed8d42 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -238,7 +238,7 @@ func TestCascadeE2E(t *testing.T) { ) require.NoError(t, err, "Failed to create Lumera client configuration") - lumeraClinet, err := lumera.NewClient(ctx, lumeraCfg) + lumeraClinet, err := lumera.NewClient(context.Background(), lumeraCfg) require.NoError(t, err, "Failed to initialize Lumera client") // --------------------------------------- @@ -360,7 +360,7 @@ func TestCascadeE2E(t *testing.T) { // Wait for transaction to be included in a block sut.AwaitNextBlock(t) - + time.Sleep(5 * time.Second) // Allow some time for the transaction to be processed // Verify the account can be queried with its public key //accountResp := cli.CustomQuery("q", "auth", "account", userAddress) //require.Contains(t, accountResp, "public_key", "User account public key should be available") @@ -406,8 +406,6 @@ func TestCascadeE2E(t *testing.T) { require.NotEmpty(t, actionID, "Action ID should not be empty") t.Logf("Extracted action ID: %s", actionID) - time.Sleep(60 * time.Second) - // Set up action client configuration // This defines how to connect to network services accConfig := sdkconfig.AccountConfig{ @@ -427,7 +425,7 @@ func TestCascadeE2E(t *testing.T) { // Initialize action client for cascade operations actionClient, err := action.NewClient( - ctx, + context.Background(), actionConfig, nil, // Nil logger - use default @@ -443,7 +441,7 @@ func TestCascadeE2E(t *testing.T) { completionCh := make(chan bool, 1) // Subscribe to ALL events - err = actionClient.SubscribeToAllEvents(ctx, func(ctx context.Context, e event.Event) { + err = actionClient.SubscribeToAllEvents(context.Background(), func(ctx context.Context, e event.Event) { // Only capture TxhasReceived events if e.Type == event.SDKTaskTxHashReceived { if txHash, ok := e.Data[event.KeyTxHash].(string); ok && txHash != "" { @@ -560,17 +558,50 @@ func TestCascadeE2E(t *testing.T) { t.Log("Test completed successfully!") - time.Sleep(1 * time.Minute) + time.Sleep(10 * time.Second) - outputFileName := "output.txt" - outputFileFullpath := filepath.Join(t.TempDir(), outputFileName) + outputFileBaseDir := filepath.Join(".") // Try to download the file using the action ID - dtaskID, err := actionClient.DownloadCascade(ctx, actionID, outputFileFullpath) + dtaskID, err := actionClient.DownloadCascade(context.Background(), actionID, outputFileBaseDir) t.Logf("Download response: %s", dtaskID) require.NoError(t, err, "Failed to download cascade data using action ID") - time.Sleep(30 * time.Second) // Wait to ensure all events are processed + time.Sleep(10 * time.Second) // Wait to ensure all events are processed + + // --------------------------------------- + // Step 11: Validate downloaded files exist + // --------------------------------------- + t.Log("Step 11: Validating downloaded files exist in expected directory structure") + + // Construct expected directory path: baseDir/{actionID}/ + expectedDownloadDir := filepath.Join(outputFileBaseDir, actionID) + t.Logf("Checking for files in directory: %s", expectedDownloadDir) + + // Check if the action directory exists + if _, err := os.Stat(expectedDownloadDir); os.IsNotExist(err) { + t.Fatalf("Expected download directory does not exist: %s", expectedDownloadDir) + } + + // Read directory contents + files, err := os.ReadDir(expectedDownloadDir) + require.NoError(t, err, "Failed to read download directory: %s", expectedDownloadDir) + + // Filter out directories, only count actual files + fileCount := 0 + var fileNames []string + for _, file := range files { + if !file.IsDir() { + fileCount++ + fileNames = append(fileNames, file.Name()) + } + } + + t.Logf("Found %d files in download directory: %v", fileCount, fileNames) + + // Validate that at least one file was downloaded + require.True(t, fileCount >= 1, "Expected at least 1 file in download directory %s, found %d files", expectedDownloadDir, fileCount) + } func Blake3Hash(msg []byte) ([]byte, error) { hasher := blake3.New(32, nil) From 26491f6ccc635bbb18d2d7f23b3f6f38fb4a9515 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Thu, 26 Jun 2025 17:24:19 +0500 Subject: [PATCH 2/2] refactor: streamline P2P node parsing and bootstrap address handling --- p2p/kademlia/bootstrap.go | 49 ++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index 6dcd71c2..8ce18f10 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -3,7 +3,6 @@ package kademlia import ( "context" "fmt" - "os" "strconv" "strings" "sync" @@ -34,11 +33,6 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { return nil, errors.New("empty address") } - /*if strings.Contains(extP2P, "0.0.0.0") { - fmt.Println("skippping node") - return nil, errors.New("invalid address") - }*/ - if extP2P == selfAddr { return nil, errors.New("self address") } @@ -61,24 +55,12 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { if err != nil { return nil, errors.New("invalid port number") } - - // For system testing, use port+1 if SYSTEM_TEST=true - if os.Getenv("SYSTEM_TEST") == "true" { - port = uint16(portNum) + 1 - logtrace.Info(context.Background(), "Using port+1 for system testing", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "original_port": portNum, - "adjusted_port": port, - }) - } else { - // For normal P2P operation, always use the default port - port = defaultNetworkPort - } + port = uint16(portNum) } } else { // No port in the address ip = extP2P - port = defaultNetworkPort + port = defaultSuperNodeP2PPort } if ip == "" { @@ -170,13 +152,32 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string continue } - // Parse the node from the IP address - node, err := s.parseNode(latestIP, selfAddress) + // Extract IP from the address (remove port if present) + var ip string + if idx := strings.LastIndex(latestIP, ":"); idx != -1 { + ip = latestIP[:idx] + } else { + ip = latestIP + } + + // Use p2p_port from supernode record + p2pPort := defaultSuperNodeP2PPort + if supernode.P2PPort != "" { + if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil { + p2pPort = int(port) + } + } + + // Create full address with p2p port for validation + fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort) + + // Parse the node from the full address + node, err := s.parseNode(fullAddress, selfAddress) if err != nil { logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), - "address": latestIP, + "address": fullAddress, "supernode": supernode.SupernodeAccount, }) continue @@ -184,7 +185,7 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string // Store the supernode account as the node ID node.ID = []byte(supernode.SupernodeAccount) - mapNodes[latestIP] = node + mapNodes[fullAddress] = node } // Convert the map to a slice