diff --git a/go.mod b/go.mod index 34c07947..17a511bc 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.50.14 require ( cosmossdk.io/math v1.5.3 github.com/AlecAivazis/survey/v2 v2.3.7 - github.com/LumeraProtocol/lumera v1.8.1 + github.com/LumeraProtocol/lumera v1.8.4 github.com/LumeraProtocol/rq-go v0.2.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cenkalti/backoff/v4 v4.3.0 diff --git a/go.sum b/go.sum index 92ba1dfe..2ce74b3b 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.8.1 h1:tN9h7hj1t9ImI0jdLKiTo1TkwCjr0wfT4DenzWh3bdY= -github.com/LumeraProtocol/lumera v1.8.1/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E= +github.com/LumeraProtocol/lumera v1.8.4 h1:6XzLS9gd0m3lOnppNS05WuZx4VCBEGvUN/KpVkSjqro= +github.com/LumeraProtocol/lumera v1.8.4/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 1c3a57ff..3d269aac 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -46,15 +46,14 @@ func (t *CascadeTask) Run(ctx context.Context) error { return err } - // 2 - Pre-filter: balance -> health -> XOR rank -> resources, then hand over + // 2 - Pre-filter: balance & health concurrently -> XOR rank, then hand over originalCount := len(supernodes) - supernodes = t.filterByMinBalance(ctx, supernodes) - supernodes = t.filterByHealth(ctx, supernodes) + supernodes, preClients := t.filterEligibleSupernodesParallel(ctx, supernodes) supernodes = t.orderByXORDistance(supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes filtered", event.EventData{event.KeyTotal: originalCount, event.KeyCount: len(supernodes)}) // 2 - Register with the supernodes - if err := t.registerWithSupernodes(ctx, supernodes); err != nil { + if err := t.registerWithSupernodes(ctx, supernodes, preClients); err != nil { t.LogEvent(ctx, event.SDKTaskFailed, "Task failed", event.EventData{event.KeyError: err.Error()}) return err } @@ -64,7 +63,7 @@ func (t *CascadeTask) Run(ctx context.Context) error { return nil } -func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { +func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes, preClients map[string]net.SupernodeClient) error { factoryCfg := net.FactoryConfig{ LocalCosmosAddress: t.config.Account.LocalCosmosAddress, PeerType: t.config.Account.PeerType, @@ -79,6 +78,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum ordered := supernodes + // Ensure any unused preClients are closed when we return + defer func() { + for addr, c := range preClients { + if c != nil { + _ = c.Close(ctx) + _ = addr // no-op + } + } + }() + var lastErr error attempted := 0 for i, sn := range ordered { @@ -91,7 +100,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum }) attempted++ - if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req); err != nil { + // Use pre-probed client if available; remove from map so we don't double-close in deferred cleanup + var pre net.SupernodeClient + if preClients != nil { + if c, ok := preClients[sn.CosmosAddress]; ok { + pre = c + delete(preClients, sn.CosmosAddress) + } + } + + if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req, pre); err != nil { t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, event.KeySupernodeAddress: sn.CosmosAddress, @@ -118,10 +136,16 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum return fmt.Errorf("failed to upload to all supernodes") } -func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { - client, err := factory.CreateClient(ctx, sn) - if err != nil { - return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) +func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error { + var client net.SupernodeClient + var err error + if preClient != nil { + client = preClient + } else { + client, err = factory.CreateClient(ctx, sn) + if err != nil { + return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) + } } defer client.Close(ctx) @@ -131,15 +155,6 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera. event.KeySupernodeAddress: sn.CosmosAddress, }) - // Just-in-time resource check for uploads (storage + RAM >= 8x file size) - var minRam uint64 - if size := getFileSizeBytes(t.filePath); size > 0 { - minRam = uint64(size) * uploadRAMMultiplier - } - if ok := t.resourcesOK(ctx, client, sn, minStorageThresholdBytes, minRam); !ok { - return fmt.Errorf("resource check failed") - } - req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { t.LogEvent(ctx, evt, msg, data) } diff --git a/sdk/task/download.go b/sdk/task/download.go index 4fefe0e6..c9a5a4b1 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -43,15 +43,14 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error { t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) return err } - // 2 - Pre-filter: balance -> health -> XOR rank + // 2 - Pre-filter: balance & health concurrently -> XOR rank originalCount := len(supernodes) - supernodes = t.filterByMinBalance(ctx, supernodes) - supernodes = t.filterByHealth(ctx, supernodes) + supernodes, preClients := t.filterEligibleSupernodesParallel(ctx, supernodes) supernodes = t.orderByXORDistance(supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes filtered", event.EventData{event.KeyTotal: originalCount, event.KeyCount: len(supernodes)}) - // 2 – download from super-nodes - if err := t.downloadFromSupernodes(ctx, supernodes); err != nil { + // 2 – download from super-nodes (reuse pre-probed clients when available) + if err := t.downloadFromSupernodes(ctx, supernodes, preClients); err != nil { t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) return err } @@ -60,13 +59,23 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error { return nil } -func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { +func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supernodes lumera.Supernodes, preClients map[string]net.SupernodeClient) error { factoryCfg := net.FactoryConfig{ LocalCosmosAddress: t.config.Account.LocalCosmosAddress, PeerType: t.config.Account.PeerType, } clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, factoryCfg) + // Ensure any unused preClients are closed when we return + defer func() { + for addr, c := range preClients { + if c != nil { + _ = c.Close(ctx) + _ = addr // retain for linter + } + } + }() + req := &supernodeservice.CascadeSupernodeDownloadRequest{ ActionID: t.actionId, TaskID: t.TaskID, @@ -96,10 +105,17 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern event.KeyIteration: iteration, }) - // Pre-filtering done; attempt directly + // Pre-filtering done; attempt directly (reuse preclient if present) attempted++ - if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil { + var pre net.SupernodeClient + if preClients != nil { + if c, ok := preClients[sn.CosmosAddress]; ok { + pre = c + delete(preClients, sn.CosmosAddress) + } + } + if err := t.attemptDownload(ctx, sn, clientFactory, req, pre); err != nil { // Log failure and continue with the rest t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, @@ -126,21 +142,23 @@ func (t *CascadeDownloadTask) attemptDownload( sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeDownloadRequest, + preClient net.SupernodeClient, ) error { ctx, cancel := context.WithTimeout(parent, downloadTimeout) defer cancel() - client, err := factory.CreateClient(ctx, sn) - if err != nil { - return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) + var client net.SupernodeClient + var err error + if preClient != nil { + client = preClient + } else { + client, err = factory.CreateClient(ctx, sn) + if err != nil { + return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) + } } defer client.Close(ctx) - // Just-in-time resource check for downloads (storage only) - if ok := t.resourcesOK(ctx, client, sn, minStorageThresholdBytes, 0); !ok { - return fmt.Errorf("resource check failed") - } - req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { t.LogEvent(ctx, evt, msg, data) } diff --git a/sdk/task/task.go b/sdk/task/task.go index a212e76a..361756a6 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "os" + "sync" sdkmath "cosmossdk.io/math" txmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" @@ -25,12 +25,9 @@ const ( TaskTypeCascade TaskType = "CASCADE" ) -// Package-level thresholds and tuning const ( - // Minimum available storage required on any volume (bytes) - minStorageThresholdBytes uint64 = 50 * 1024 * 1024 * 1024 // 50 GB - // Upload requires free RAM to be at least 8x the file size - uploadRAMMultiplier uint64 = 8 + prefilterParallelism int = 10 + minEligibleBalanceULUME int64 = 1_000_000 // 1 LUME in ulume ) // EventCallback is a function that processes events from tasks @@ -106,109 +103,124 @@ func (t *BaseTask) orderByXORDistance(sns lumera.Supernodes) lumera.Supernodes { return orderSupernodesByDeterministicDistance(seed, sns) } -// helper: get file size (bytes). returns 0 on error -func getFileSizeBytes(p string) int64 { - fi, err := os.Stat(p) - if err != nil { - return 0 - } - return fi.Size() -} - -func (t *BaseTask) resourcesOK(ctx context.Context, client net.SupernodeClient, sn lumera.Supernode, minStorageBytes uint64, minFreeRamBytes uint64) bool { - // In tests, skip resource thresholds (keep balance + health via nodeQualifies) - if os.Getenv("INTEGRATION_TEST") == "true" { - return true - } - status, err := client.GetSupernodeStatus(ctx) - if err != nil || status == nil || status.Resources == nil { - return false - } - // Storage: any volume must satisfy available >= minStorageBytes - if minStorageBytes > 0 { - ok := false - for _, vol := range status.Resources.StorageVolumes { - if vol != nil && vol.AvailableBytes >= minStorageBytes { - ok = true - break - } - } - if !ok { - return false - } - } - // RAM: available_gb must be >= required GiB - if minFreeRamBytes > 0 { - mem := status.Resources.Memory - if mem == nil { - return false - } - requiredGiB := float64(minFreeRamBytes) / (1024.0 * 1024.0 * 1024.0) - if mem.AvailableGb < requiredGiB { - return false - } - } - return true -} - -// filterByHealth returns nodes that report gRPC health SERVING. -func (t *BaseTask) filterByHealth(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { +// filterEligibleSupernodesParallel +// Fast, bounded-concurrency discovery that keeps only nodes that pass: +// +// (1) gRPC Health SERVING +// (2) Peers > 1 (via Status API; single-line gate for basic network liveness) +// (3) On-chain balance >= 1 LUME (in ulume) +// +// Strategy: +// - Spawn at most prefilterParallelism goroutines (bounded fan-out). +// - For each node, run Health (incl. dial) and Balance concurrently under one timeout. +// - Early-cancel sibling work on definitive failure to save time. +// - Reuse healthy client connections during registration to skip a second dial. +func (t *BaseTask) filterEligibleSupernodesParallel(parent context.Context, sns lumera.Supernodes) (lumera.Supernodes, map[string]net.SupernodeClient) { if len(sns) == 0 { - return sns + return sns, nil } + + // Step 0 — shared state for this pass keep := make([]bool, len(sns)) - for i, sn := range sns { - i, sn := i, sn - ctx, cancel := context.WithTimeout(parent, connectionTimeout) - func() { - defer cancel() - client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{ - LocalCosmosAddress: t.config.Account.LocalCosmosAddress, - PeerType: t.config.Account.PeerType, - }).CreateClient(ctx, sn) - if err != nil { - return - } - defer client.Close(ctx) - h, err := client.HealthCheck(ctx) - if err == nil && h != nil && h.Status == grpc_health_v1.HealthCheckResponse_SERVING { - keep[i] = true - } - }() - } - out := make(lumera.Supernodes, 0, len(sns)) - for i, sn := range sns { - if keep[i] { - out = append(out, sn) - } - } - return out -} + var wg sync.WaitGroup + sem := make(chan struct{}, prefilterParallelism) + preClients := make(map[string]net.SupernodeClient) + var mtx sync.Mutex -// filterByMinBalance filters by requiring at least a minimum balance in the default fee denom. -func (t *BaseTask) filterByMinBalance(parent context.Context, sns lumera.Supernodes) lumera.Supernodes { - if len(sns) == 0 { - return sns - } - min := sdkmath.NewInt(1_000_000) // 1 LUME in ulume + // Step 0.1 — constants/resource handles used by probes + min := sdkmath.NewInt(minEligibleBalanceULUME) // 1 LUME in ulume denom := txmod.DefaultFeeDenom - keep := make([]bool, len(sns)) + + factoryCfg := net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, + PeerType: t.config.Account.PeerType, + } + clientFactory := net.NewClientFactory(parent, t.logger, t.keyring, t.client, factoryCfg) + + // Step 1 — spawn bounded goroutines, one per supernode for i, sn := range sns { - i, sn := i, sn - ctx, cancel := context.WithTimeout(parent, connectionTimeout) - func() { + wg.Add(1) + go func(i int, sn lumera.Supernode) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + // Step 1.1 — per-node shared timeout + cancellation for early exit + probeCtx, cancel := context.WithTimeout(parent, connectionTimeout) defer cancel() - bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) - if err == nil && bal != nil && bal.Balance != nil && !bal.Balance.Amount.LT(min) { + + var innerWg sync.WaitGroup + var healthOK, balanceOK bool + var c net.SupernodeClient + + innerWg.Add(2) + + // Step 1.2.1 — Health probe (dial + check) — create client here so balance starts immediately + go func() { + defer innerWg.Done() + client, err := clientFactory.CreateClient(probeCtx, sn) + if err != nil { + // Unable to connect → ineligible; cancel sibling + cancel() + return + } + // Keep reference for potential reuse; close later based on outcome + c = client + // (1) Health SERVING gate + h, err := client.HealthCheck(probeCtx) + if err == nil && h != nil && h.Status == grpc_health_v1.HealthCheckResponse_SERVING { + healthOK = true + } else { + // Health failed → ineligible; cancel sibling + cancel() + return + } + // (2) One-liner peers>1 gate using Status API to ensure network liveness + if st, err := client.GetSupernodeStatus(probeCtx); err != nil || st == nil || st.Network == nil || st.Network.PeersCount <= 1 { + healthOK = false + cancel() + return + } + }() + + // Step 1.2.2 — Balance probe (chain) — independent of gRPC dial + go func() { + defer innerWg.Done() + bal, err := t.client.GetBalance(probeCtx, sn.CosmosAddress, denom) + if err == nil && bal != nil && bal.Balance != nil && !bal.Balance.Amount.LT(min) { + balanceOK = true + } else { + // Insufficient or error → ineligible; cancel sibling + cancel() + } + }() + + // Step 1.3 — Wait for both probes, then decide eligibility and manage client lifecycle + innerWg.Wait() + if healthOK && balanceOK { keep[i] = true + // Stash the client for reuse; key by CosmosAddress + if c != nil { + mtx.Lock() + preClients[sn.CosmosAddress] = c + mtx.Unlock() + } + } else { + // Close any created client we won't reuse + if c != nil { + _ = c.Close(context.Background()) + } } - }() + }(i, sn) } + wg.Wait() + + // Step 2 — build output preserving original order out := make(lumera.Supernodes, 0, len(sns)) for i, sn := range sns { if keep[i] { out = append(out, sn) } } - return out + return out, preClients } diff --git a/tests/system/go.mod b/tests/system/go.mod index b98ddfdb..4fac1d6a 100644 --- a/tests/system/go.mod +++ b/tests/system/go.mod @@ -54,7 +54,7 @@ require ( github.com/99designs/keyring v1.2.2 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/DataDog/zstd v1.5.7 // indirect - github.com/LumeraProtocol/lumera v1.8.1 // indirect + github.com/LumeraProtocol/lumera v1.8.4 // indirect github.com/LumeraProtocol/rq-go v0.2.1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/tests/system/go.sum b/tests/system/go.sum index fff10606..9e5e89ad 100644 --- a/tests/system/go.sum +++ b/tests/system/go.sum @@ -72,8 +72,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.8.1 h1:tN9h7hj1t9ImI0jdLKiTo1TkwCjr0wfT4DenzWh3bdY= -github.com/LumeraProtocol/lumera v1.8.1/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E= +github.com/LumeraProtocol/lumera v1.8.4 h1:6XzLS9gd0m3lOnppNS05WuZx4VCBEGvUN/KpVkSjqro= +github.com/LumeraProtocol/lumera v1.8.4/go.mod h1:twrSLfuXcHvmfQoN5e02Bg7rfeevUjF34SVqEJIvH1E= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=