Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ CONFIG_FILE3=tests/system/config.test-3.yml
SETUP_SCRIPT=tests/scripts/setup-supernodes.sh

# Install Lumera
# Optional: specify lumera binary path to skip download
LUMERAD_BINARY ?=
# Optional: specify installation mode (latest-release, latest-tag, or vX.Y.Z)
INSTALL_MODE ?=latest-tag

install-lumera:
@echo "Installing Lumera..."
@chmod +x tests/scripts/install-lumera.sh
@sudo tests/scripts/install-lumera.sh latest-tag

@sudo LUMERAD_BINARY="$(LUMERAD_BINARY)" tests/scripts/install-lumera.sh $(INSTALL_MODE)
@echo "PtTDUHythfRfXHh63yzyiGDid4TZj2P76Zd,18749999981413" > ~/claims.csv
# Setup supernode environments
setup-supernodes:
@echo "Setting up all supernode environments..."
Expand Down
49 changes: 25 additions & 24 deletions p2p/kademlia/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kademlia
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -34,11 +33,6 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
return nil, errors.New("empty address")
}

/*if strings.Contains(extP2P, "0.0.0.0") {
fmt.Println("skippping node")
return nil, errors.New("invalid address")
}*/

if extP2P == selfAddr {
return nil, errors.New("self address")
}
Expand All @@ -61,24 +55,12 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
if err != nil {
return nil, errors.New("invalid port number")
}

// For system testing, use port+1 if SYSTEM_TEST=true
if os.Getenv("SYSTEM_TEST") == "true" {
port = uint16(portNum) + 1
logtrace.Info(context.Background(), "Using port+1 for system testing", logtrace.Fields{
logtrace.FieldModule: "p2p",
"original_port": portNum,
"adjusted_port": port,
})
} else {
// For normal P2P operation, always use the default port
port = defaultNetworkPort
}
port = uint16(portNum)
}
} else {
// No port in the address
ip = extP2P
port = defaultNetworkPort
port = defaultSuperNodeP2PPort
}

if ip == "" {
Expand Down Expand Up @@ -170,21 +152,40 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
continue
}

// Parse the node from the IP address
node, err := s.parseNode(latestIP, selfAddress)
// Extract IP from the address (remove port if present)
var ip string
if idx := strings.LastIndex(latestIP, ":"); idx != -1 {
ip = latestIP[:idx]
} else {
ip = latestIP
}

// Use p2p_port from supernode record
p2pPort := defaultSuperNodeP2PPort
if supernode.P2PPort != "" {
if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil {
p2pPort = int(port)
}
}

// Create full address with p2p port for validation
fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort)

// Parse the node from the full address
node, err := s.parseNode(fullAddress, selfAddress)
if err != nil {
logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{
logtrace.FieldModule: "p2p",
logtrace.FieldError: err.Error(),
"address": latestIP,
"address": fullAddress,
"supernode": supernode.SupernodeAccount,
})
continue
}

// Store the supernode account as the node ID
node.ID = []byte(supernode.SupernodeAccount)
mapNodes[latestIP] = node
mapNodes[fullAddress] = node
}

// Convert the map to a slice
Expand Down
16 changes: 6 additions & 10 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ import (
//
//go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go
type Client interface {
// - signature: Base64-encoded cryptographic signature of the file's data hash (blake3)
// 1- hash(blake3) > 2- sign > 3- base64
// The signature must be created by the same account that created the Lumera action.
// It must be a digital signature of the data hash found in the action's CASCADE metadata.
// StartCascade initiates a cascade operation with file path, action ID, and signature
// signature: Base64-encoded signature of file's blake3 hash by action creator
StartCascade(ctx context.Context, filePath string, actionID string, signature string) (string, error)
DeleteTask(ctx context.Context, taskID string) error
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error
SubscribeToAllEvents(ctx context.Context, handler event.Handler) error
DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error)
// DownloadCascade downloads cascade to outputDir, filename determined by action ID
DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error)
}

// ClientImpl implements the Client interface
Expand Down Expand Up @@ -130,16 +129,13 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han
return nil
}

func (c *ClientImpl) DownloadCascade(
ctx context.Context,
actionID, outputPath string,
) (string, error) {
func (c *ClientImpl) DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error) {

if actionID == "" {
return "", fmt.Errorf("actionID is empty")
}

taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath)
taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputDir)
if err != nil {
return "", fmt.Errorf("create download task: %w", err)
}
Expand Down
28 changes: 12 additions & 16 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
"github.com/LumeraProtocol/supernode/pkg/net"
Expand Down Expand Up @@ -178,16 +179,20 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
ActionId: in.ActionID,
}, opts...)
if err != nil {
a.logger.Error(ctx, "failed to create download stream",
"action_id", in.ActionID, "error", err)
a.logger.Error(ctx, "failed to create download stream", "action_id", in.ActionID, "error", err)
return nil, err
}

// 2. Prepare destination file
// Create directory structure if it doesn't exist
if err := os.MkdirAll(filepath.Dir(in.OutputPath), 0755); err != nil {
a.logger.Error(ctx, "failed to create output directory", "path", filepath.Dir(in.OutputPath), "error", err)
return nil, fmt.Errorf("create output directory: %w", err)
}

outFile, err := os.Create(in.OutputPath)
if err != nil {
a.logger.Error(ctx, "failed to create output file",
"path", in.OutputPath, "error", err)
a.logger.Error(ctx, "failed to create output file", "path", in.OutputPath, "error", err)
return nil, fmt.Errorf("create output file: %w", err)
}
defer outFile.Close()
Expand All @@ -211,10 +216,7 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(

// 3a. Progress / event message
case *cascade.DownloadResponse_Event:
a.logger.Info(ctx, "supernode event",
"event_type", x.Event.EventType,
"message", x.Event.Message,
"action_id", in.ActionID)
a.logger.Info(ctx, "supernode event", "event_type", x.Event.EventType, "message", x.Event.Message, "action_id", in.ActionID)

if in.EventLogger != nil {
in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{
Expand All @@ -237,17 +239,11 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
bytesWritten += int64(len(data))
chunkIndex++

a.logger.Debug(ctx, "received chunk",
"chunk_index", chunkIndex,
"chunk_size", len(data),
"bytes_written", bytesWritten)
a.logger.Debug(ctx, "received chunk", "chunk_index", chunkIndex, "chunk_size", len(data), "bytes_written", bytesWritten)
}
}

a.logger.Info(ctx, "download complete",
"bytes_written", bytesWritten,
"path", in.OutputPath,
"action_id", in.ActionID)
a.logger.Info(ctx, "download complete", "bytes_written", bytesWritten, "path", in.OutputPath, "action_id", in.ActionID)

return &CascadeSupernodeDownloadResponse{
Success: true,
Expand Down
156 changes: 134 additions & 22 deletions sdk/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,42 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
OutputPath: t.outputPath,
}

var lastErr error
for idx, sn := range supernodes {
t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: idx + 1,
})
// Process supernodes in pairs
var allErrors []error
for i := 0; i < len(supernodes); i += 2 {
// Determine how many supernodes to try in this batch (1 or 2)
batchSize := 2
if i+1 >= len(supernodes) {
batchSize = 1
}

if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil {
lastErr = err
t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: idx + 1,
event.KeyError: err.Error(),
t.logger.Info(ctx, "attempting concurrent download from supernode batch", "batch_start", i, "batch_size", batchSize)

// Try downloading from this batch concurrently
result, batchErrors := t.attemptConcurrentDownload(ctx, supernodes[i:i+batchSize], clientFactory, req, i)

if result != nil {
// Success! Log and return
t.LogEvent(ctx, event.SDKDownloadSuccessful, "download successful", event.EventData{
event.KeySupernode: result.SupernodeEndpoint,
event.KeySupernodeAddress: result.SupernodeAddress,
event.KeyIteration: result.Iteration,
})
continue
return nil
}

t.LogEvent(ctx, event.SDKDownloadSuccessful, "download successful", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: idx + 1,
})
return nil
// Both (or the single one) failed, collect errors
allErrors = append(allErrors, batchErrors...)

// Log batch failure
t.logger.Warn(ctx, "download batch failed", "batch_start", i, "batch_size", batchSize, "errors", len(batchErrors))
}

// All attempts failed
if len(allErrors) > 0 {
return fmt.Errorf("failed to download from all super-nodes: %v", allErrors)
}
return fmt.Errorf("failed to download from all super-nodes: %w", lastErr)
return fmt.Errorf("no supernodes available for download")
}

func (t *CascadeDownloadTask) attemptDownload(
Expand Down Expand Up @@ -129,3 +138,106 @@ func (t *CascadeDownloadTask) attemptDownload(

return nil
}

// downloadResult holds the result of a successful download attempt
type downloadResult struct {
SupernodeAddress string
SupernodeEndpoint string
Iteration int
}

// attemptConcurrentDownload tries to download from multiple supernodes concurrently
// Returns the first successful result or all errors if all attempts fail
func (t *CascadeDownloadTask) attemptConcurrentDownload(
ctx context.Context,
batch lumera.Supernodes,
factory *net.ClientFactory,
req *supernodeservice.CascadeSupernodeDownloadRequest,
baseIteration int,
) (*downloadResult, []error) {
// Create a cancellable context for this batch
batchCtx, cancelBatch := context.WithCancel(ctx)
defer cancelBatch()

// Channels for results
type attemptResult struct {
success *downloadResult
err error
idx int
}
resultCh := make(chan attemptResult, len(batch))

// Start concurrent download attempts
for idx, sn := range batch {
iteration := baseIteration + idx + 1

// Log download attempt
t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: iteration,
})

go func(sn lumera.Supernode, idx int, iter int) {
// Create a copy of the request for this goroutine
reqCopy := &supernodeservice.CascadeSupernodeDownloadRequest{
ActionID: req.ActionID,
TaskID: req.TaskID,
OutputPath: req.OutputPath,
}

err := t.attemptDownload(batchCtx, sn, factory, reqCopy)
if err != nil {
resultCh <- attemptResult{
err: err,
idx: idx,
}
return
}

resultCh <- attemptResult{
success: &downloadResult{
SupernodeAddress: sn.CosmosAddress,
SupernodeEndpoint: sn.GrpcEndpoint,
Iteration: iter,
},
idx: idx,
}
}(sn, idx, iteration)
}

// Collect results
var errors []error
for i := 0; i < len(batch); i++ {
select {
case result := <-resultCh:
if result.success != nil {
// Success! Cancel other attempts and return
cancelBatch()
// Drain remaining results to avoid goroutine leaks
go func() {
for j := i + 1; j < len(batch); j++ {
<-resultCh
}
}()
return result.success, nil
}

// Log failure
sn := batch[result.idx]
t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: baseIteration + result.idx + 1,
event.KeyError: result.err.Error(),
})
errors = append(errors, result.err)

case <-ctx.Done():
return nil, []error{ctx.Err()}
}
}

// All attempts in this batch failed
return nil, errors
}
Loading
Loading