Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.50.14
require (
cosmossdk.io/math v1.5.3
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/LumeraProtocol/lumera v1.8.1
github.com/LumeraProtocol/lumera v1.8.4
github.com/LumeraProtocol/rq-go v0.2.1
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/cenkalti/backoff/v4 v4.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/LumeraProtocol/lumera v1.8.1 h1:tN9h7hj1t9ImI0jdLKiTo1TkwCjr0wfT4DenzWh3bdY=
github.com/LumeraProtocol/lumera v1.8.1/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E=
github.com/LumeraProtocol/lumera v1.8.4 h1:6XzLS9gd0m3lOnppNS05WuZx4VCBEGvUN/KpVkSjqro=
github.com/LumeraProtocol/lumera v1.8.4/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E=
github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4=
github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
Expand Down
53 changes: 34 additions & 19 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ func (t *CascadeTask) Run(ctx context.Context) error {
return err
}

// 2 - Pre-filter: balance -> health -> XOR rank -> resources, then hand over
// 2 - Pre-filter: balance & health concurrently -> XOR rank, then hand over
originalCount := len(supernodes)
supernodes = t.filterByMinBalance(ctx, supernodes)
supernodes = t.filterByHealth(ctx, supernodes)
supernodes, preClients := t.filterEligibleSupernodesParallel(ctx, supernodes)
supernodes = t.orderByXORDistance(supernodes)
t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes filtered", event.EventData{event.KeyTotal: originalCount, event.KeyCount: len(supernodes)})

// 2 - Register with the supernodes
if err := t.registerWithSupernodes(ctx, supernodes); err != nil {
if err := t.registerWithSupernodes(ctx, supernodes, preClients); err != nil {
t.LogEvent(ctx, event.SDKTaskFailed, "Task failed", event.EventData{event.KeyError: err.Error()})
return err
}
Expand All @@ -64,7 +63,7 @@ func (t *CascadeTask) Run(ctx context.Context) error {
return nil
}

func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error {
func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes, preClients map[string]net.SupernodeClient) error {
factoryCfg := net.FactoryConfig{
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
PeerType: t.config.Account.PeerType,
Expand All @@ -79,6 +78,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum

ordered := supernodes

// Ensure any unused preClients are closed when we return
defer func() {
for addr, c := range preClients {
if c != nil {
_ = c.Close(ctx)
_ = addr // no-op
}
}
}()

var lastErr error
attempted := 0
for i, sn := range ordered {
Expand All @@ -91,7 +100,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
})

attempted++
if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req); err != nil {
// Use pre-probed client if available; remove from map so we don't double-close in deferred cleanup
var pre net.SupernodeClient
if preClients != nil {
if c, ok := preClients[sn.CosmosAddress]; ok {
pre = c
delete(preClients, sn.CosmosAddress)
}
}

if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req, pre); err != nil {
t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
Expand All @@ -118,10 +136,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
return fmt.Errorf("failed to upload to all supernodes")
}

func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error {
client, err := factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error {
var client net.SupernodeClient
var err error
if preClient != nil {
client = preClient
} else {
client, err = factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
}
}
defer client.Close(ctx)

Expand All @@ -131,15 +155,6 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.
event.KeySupernodeAddress: sn.CosmosAddress,
})

// Just-in-time resource check for uploads (storage + RAM >= 8x file size)
var minRam uint64
if size := getFileSizeBytes(t.filePath); size > 0 {
minRam = uint64(size) * uploadRAMMultiplier
}
if ok := t.resourcesOK(ctx, client, sn, minStorageThresholdBytes, minRam); !ok {
return fmt.Errorf("resource check failed")
}

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
t.LogEvent(ctx, evt, msg, data)
}
Expand Down
50 changes: 34 additions & 16 deletions sdk/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error {
t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()})
return err
}
// 2 - Pre-filter: balance -> health -> XOR rank
// 2 - Pre-filter: balance & health concurrently -> XOR rank
originalCount := len(supernodes)
supernodes = t.filterByMinBalance(ctx, supernodes)
supernodes = t.filterByHealth(ctx, supernodes)
supernodes, preClients := t.filterEligibleSupernodesParallel(ctx, supernodes)
supernodes = t.orderByXORDistance(supernodes)
t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes filtered", event.EventData{event.KeyTotal: originalCount, event.KeyCount: len(supernodes)})

// 2 – download from super-nodes
if err := t.downloadFromSupernodes(ctx, supernodes); err != nil {
// 2 – download from super-nodes (reuse pre-probed clients when available)
if err := t.downloadFromSupernodes(ctx, supernodes, preClients); err != nil {
t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()})
return err
}
Expand All @@ -60,13 +59,23 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error {
return nil
}

func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supernodes lumera.Supernodes) error {
func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supernodes lumera.Supernodes, preClients map[string]net.SupernodeClient) error {
factoryCfg := net.FactoryConfig{
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
PeerType: t.config.Account.PeerType,
}
clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, factoryCfg)

// Ensure any unused preClients are closed when we return
defer func() {
for addr, c := range preClients {
if c != nil {
_ = c.Close(ctx)
_ = addr // retain for linter
}
}
}()

req := &supernodeservice.CascadeSupernodeDownloadRequest{
ActionID: t.actionId,
TaskID: t.TaskID,
Expand Down Expand Up @@ -96,10 +105,17 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
event.KeyIteration: iteration,
})

// Pre-filtering done; attempt directly
// Pre-filtering done; attempt directly (reuse preclient if present)

attempted++
if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil {
var pre net.SupernodeClient
if preClients != nil {
if c, ok := preClients[sn.CosmosAddress]; ok {
pre = c
delete(preClients, sn.CosmosAddress)
}
}
if err := t.attemptDownload(ctx, sn, clientFactory, req, pre); err != nil {
// Log failure and continue with the rest
t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
Expand All @@ -126,21 +142,23 @@ func (t *CascadeDownloadTask) attemptDownload(
sn lumera.Supernode,
factory *net.ClientFactory,
req *supernodeservice.CascadeSupernodeDownloadRequest,
preClient net.SupernodeClient,
) error {
ctx, cancel := context.WithTimeout(parent, downloadTimeout)
defer cancel()

client, err := factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
var client net.SupernodeClient
var err error
if preClient != nil {
client = preClient
} else {
client, err = factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
}
}
defer client.Close(ctx)

// Just-in-time resource check for downloads (storage only)
if ok := t.resourcesOK(ctx, client, sn, minStorageThresholdBytes, 0); !ok {
return fmt.Errorf("resource check failed")
}

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
t.LogEvent(ctx, evt, msg, data)
}
Expand Down
Loading