From 8214774ddd0d1b20bedafd8fc922cd703da4bbd8 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Thu, 18 Sep 2025 03:43:24 +0500 Subject: [PATCH 1/3] conn: keepalive monitor --- pkg/lumera/connection.go | 297 +++++++++++++++++++++++++++++++-------- 1 file changed, 241 insertions(+), 56 deletions(-) diff --git a/pkg/lumera/connection.go b/pkg/lumera/connection.go index e5ac5b4c..ab28702c 100644 --- a/pkg/lumera/connection.go +++ b/pkg/lumera/connection.go @@ -9,16 +9,29 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" + + "os" + + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" ) const ( - defaultLumeraPort = "9090" + // Time budget to wait for a candidate to reach READY + dialReadyTimeout = 10 * time.Second +) + +// Default ports to try when none is specified (order matters). +var defaultDialPorts = []string{"9090", "443"} - keepaliveTime = 6 * time.Minute - keepaliveTimeout = 10 * time.Second +// Long-lived connection keepalive params +const ( + keepaliveIdleTime = 10 * time.Minute + keepaliveAckTimeout = 20 * time.Second + reconnectionGracePeriod = 30 * time.Second ) // Connection defines the interface for a client connection. @@ -32,91 +45,263 @@ type grpcConnection struct { conn *grpc.ClientConn } -// newGRPCConnection creates a new gRPC connection. It chooses TLS when the -// address implies HTTPS/grpcs or port 443; otherwise it keeps the previous -// insecure (h2c) behaviour. +// newGRPCConnection creates a new gRPC connection by racing multiple candidates +// derived from the input address. The first candidate to become READY wins and +// the others are closed. Candidates include TLS and plaintext across common +// ports, depending only on whether a port was explicitly provided. func newGRPCConnection(ctx context.Context, rawAddr string) (Connection, error) { - hostPort, useTLS, serverName, err := normaliseAddr(rawAddr) + meta, err := parseAddrMeta(rawAddr) if err != nil { return nil, err } - var creds credentials.TransportCredentials - if useTLS { - creds = credentials.NewClientTLSFromCert(nil, serverName) - } else { - creds = insecure.NewCredentials() + cands := generateCandidates(meta) + if len(cands) == 0 { + return nil, fmt.Errorf("no connection candidates generated for %q", rawAddr) } - conn, err := createGRPCConnection(ctx, hostPort, creds) - if err != nil { - return nil, fmt.Errorf("failed to connect to gRPC server: %w", err) + // Parent context to cancel all attempts when one succeeds. + parentCtx, cancelAll := context.WithCancel(ctx) + defer cancelAll() + + type result struct { + conn *grpc.ClientConn + err error + cand dialCandidate + } + + resCh := make(chan result, len(cands)) + + for _, cand := range cands { + go func(c dialCandidate) { + creds := insecure.NewCredentials() + if c.useTLS { + creds = credentials.NewClientTLSFromCert(nil, c.serverName) + } + + // Per-attempt timeout + attemptCtx, cancel := context.WithTimeout(parentCtx, dialReadyTimeout) + defer cancel() + + conn, err := createGRPCConnection(attemptCtx, c.target, creds) + resCh <- result{conn: conn, err: err, cand: c} + }(cand) + } + + var firstConn *grpc.ClientConn + var firstCand dialCandidate + var firstErr error + var winnerIndex = -1 + // Collect results; return on first success. + for i := 0; i < len(cands); i++ { + r := <-resCh + if r.err == nil && r.conn != nil && winnerIndex == -1 { + firstConn = r.conn + firstCand = r.cand + winnerIndex = i + // Do not break yet; continue receiving to close any late winners. + continue + } + // Close any non-winning connection to avoid leaks. + if r.conn != nil { + _ = r.conn.Close() + } + // Keep the first error to report if all fail. + if firstErr == nil { + firstErr = r.err + } } - return &grpcConnection{conn: conn}, nil + if firstConn == nil { + if firstErr == nil { + firstErr = fmt.Errorf("all connection attempts failed") + } + return nil, firstErr + } + + // Cancel remaining attempts; return the winner. + cancelAll() + + // Info log showing final selected target and scheme + scheme := "plaintext" + if firstCand.useTLS { + scheme = "tls" + } + logtrace.Info(ctx, "gRPC connection established", logtrace.Fields{ + "target": firstCand.target, + "scheme": scheme, + }) + + // Start a monitor to terminate the app if connection is lost + go monitorConnection(ctx, firstConn) + + return &grpcConnection{conn: firstConn}, nil } -// Accepts all of these: -// -// https://grpc.testnet.lumera.io → TLS, host = grpc.testnet.lumera.io:443 -// grpcs://grpc.node9x.com:7443 → TLS, host = grpc.node9x.com:7443 -// grpc.node9x.com:443 → TLS, host = grpc.node9x.com:443 -// grpc.node9x.com:9090 → h2c, host = grpc.node9x.com:9090 -// grpc.testnet.lumera.io → h2c, host = grpc.testnet.lumera.io:9090 -func normaliseAddr(raw string) (hostPort string, useTLS bool, serverName string, err error) { - // If scheme present, parse as URL first. +// addressMeta captures parsed input and dialing policy. +type addressMeta struct { + host string + port string // optional; empty means "unspecified" + allowBoth bool + serverName string + hasExplicitPort bool +} + +// parseAddrMeta parses the raw address and determines dialing policy. +func parseAddrMeta(raw string) (addressMeta, error) { + var meta addressMeta + if strings.Contains(raw, "://") { u, err := url.Parse(raw) if err != nil { - return "", false, "", fmt.Errorf("parse address %q: %w", raw, err) + return meta, fmt.Errorf("parse address %q: %w", raw, err) } + // Ignore scheme for policy; use only host/port + meta.host = u.Hostname() + meta.port = u.Port() + meta.serverName = meta.host + meta.hasExplicitPort = meta.port != "" + meta.allowBoth = true + return meta, nil + } - host := u.Hostname() - port := u.Port() - switch u.Scheme { - case "https", "grpcs": - useTLS = true - if port == "" { - port = "443" - } - case "http", "grpc": - useTLS = false - if port == "" { - port = defaultLumeraPort - } - default: - return "", false, "", fmt.Errorf("unsupported scheme %q in %q", u.Scheme, raw) + // No scheme: split host[:port] + host, port, err := net.SplitHostPort(raw) + if err != nil { + // No port provided + meta.host = raw + meta.port = "" + meta.serverName = meta.host + meta.allowBoth = true + meta.hasExplicitPort = false + return meta, nil + } + meta.host = host + meta.port = port + meta.serverName = meta.host + meta.hasExplicitPort = true + // With explicit port: try both TLS and plaintext on that port. + meta.allowBoth = true + return meta, nil +} + +type dialCandidate struct { + target string // host:port + useTLS bool + serverName string +} + +// generateCandidates builds the set of dial candidates based on addressMeta. +func generateCandidates(meta addressMeta) []dialCandidate { + // Helper to append unique candidates + seen := make(map[string]struct{}) + add := func(host, port string, useTLS bool, serverName string, out *[]dialCandidate) { + if host == "" || port == "" { + return + } + target := net.JoinHostPort(host, port) + key := target + "|" + fmt.Sprint(useTLS) + if _, ok := seen[key]; ok { + return } - return net.JoinHostPort(host, port), useTLS, host, nil + seen[key] = struct{}{} + *out = append(*out, dialCandidate{target: target, useTLS: useTLS, serverName: serverName}) } - // No scheme: split host[:port]. - host, port, splitErr := net.SplitHostPort(raw) - if splitErr != nil { - // No port given → assume :9090 / plaintext. - return net.JoinHostPort(raw, defaultLumeraPort), false, raw, nil + var out []dialCandidate + // Determine port lists + var ports []string + if meta.hasExplicitPort { + ports = []string{meta.port} + } else { + ports = defaultDialPorts } - // Port explicit. - if port == "443" { - return net.JoinHostPort(host, port), true, host, nil + // Always allow both TLS and plaintext per requirements. + for _, p := range ports { + add(meta.host, p, true, meta.serverName, &out) + } + for _, p := range ports { + add(meta.host, p, false, meta.serverName, &out) } - return net.JoinHostPort(host, port), false, host, nil + return out } // createGRPCConnection creates a gRPC connection with keepalive func createGRPCConnection(ctx context.Context, hostPort string, creds credentials.TransportCredentials) (*grpc.ClientConn, error) { - _ = ctx // Keeping this for api compatibility opts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: keepaliveTime, - Timeout: keepaliveTimeout, - PermitWithoutStream: false, + Time: keepaliveIdleTime, + Timeout: keepaliveAckTimeout, + PermitWithoutStream: true, }), } - return grpc.NewClient(hostPort, opts...) + // Establish client connection (non-blocking) then wait until READY. + conn, err := grpc.NewClient(hostPort, opts...) + if err != nil { + return nil, err + } + + // Start connection attempts and wait for readiness with a bounded timeout. + conn.Connect() + + // Use provided context deadline if present; otherwise apply a default. + var cancel context.CancelFunc = func() {} + if _, ok := ctx.Deadline(); !ok { + ctx, cancel = context.WithTimeout(ctx, dialReadyTimeout) + } + defer cancel() + + for { + state := conn.GetState() + switch state { + case connectivity.Ready: + return conn, nil + case connectivity.Shutdown: + conn.Close() + return nil, fmt.Errorf("grpc connection is shutdown") + case connectivity.TransientFailure: + conn.Close() + return nil, fmt.Errorf("grpc connection is in transient failure") + default: + // Idle or Connecting: wait for a state change or timeout + if !conn.WaitForStateChange(ctx, state) { + conn.Close() + return nil, fmt.Errorf("timeout waiting for grpc connection readiness") + } + } + } +} + +// monitorConnection watches the connection state and exits the process if the +// connection transitions to Shutdown or remains in TransientFailure beyond a grace period. +func monitorConnection(ctx context.Context, conn *grpc.ClientConn) { + for { + state := conn.GetState() + switch state { + case connectivity.Shutdown: + logtrace.Error(ctx, "gRPC connection shutdown", logtrace.Fields{"action": "exit"}) + os.Exit(1) + case connectivity.TransientFailure: + // Allow some time to recover to Ready + gctx, cancel := context.WithTimeout(ctx, reconnectionGracePeriod) + for conn.GetState() == connectivity.TransientFailure { + if !conn.WaitForStateChange(gctx, connectivity.TransientFailure) { + cancel() + logtrace.Error(ctx, "gRPC connection lost (transient failure)", logtrace.Fields{"grace": reconnectionGracePeriod.String(), "action": "exit"}) + os.Exit(1) + } + } + cancel() + default: + // Idle/Connecting/Ready: just wait for state change + if !conn.WaitForStateChange(ctx, state) { + return + } + } + } } // Close closes the gRPC connection. From 4b459168c5e416ec59f2da12faf26ec0c26101d0 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Thu, 18 Sep 2025 03:43:24 +0500 Subject: [PATCH 2/3] docs: CONNECTION --- pkg/lumera/CONNECTION.md | 118 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 pkg/lumera/CONNECTION.md diff --git a/pkg/lumera/CONNECTION.md b/pkg/lumera/CONNECTION.md new file mode 100644 index 00000000..86162757 --- /dev/null +++ b/pkg/lumera/CONNECTION.md @@ -0,0 +1,118 @@ +# Lumera gRPC Connection Design + +This document explains how `pkg/lumera/connection.go` establishes and maintains a robust, long‑lived gRPC client connection. + +## Goals + +- Be resilient to varying deployment setups (LBs, custom ports, TLS/plain). +- Prefer a fast, successful connection without relying on URL scheme. +- Keep the connection alive for long periods (days) safely. +- Fail fast at startup if no candidate becomes READY. +- Exit the process if a previously established connection is irrecoverably lost. + +## High‑Level Flow + +1. Parse input into `host` and optional `port` (ignore scheme entirely). +2. Generate dial candidates across ports and security modes: + - If no port: use default ports `[9090, 443]`. + - Always generate both TLS and plaintext candidates (4 total when no port; 2 total when a port is specified). +3. Race all candidates concurrently. +4. Each candidate dials non‑blocking, then explicitly waits until the connection reaches `READY` (or times out). +5. The first candidate to hit `READY` is selected; all others are closed (including “late winners”). +6. A monitor goroutine observes the connection state and exits the process if the connection is lost. + +## Inputs and Parsing + +- `parseAddrMeta(raw string)` extracts `host` and `port`. +- URL schemes (e.g., `https`, `grpcs`, `http`, `grpc`) are ignored for policy decisions. Only host/port matter. +- If no port is provided, we consider it “unspecified” and will try defaults. + +## Candidate Generation + +- `generateCandidates(meta)` creates a de‑duplicated set of `(target, useTLS)` pairs: + - Ports: + - explicit port → `[port]` + - no port → `defaultDialPorts = [9090, 443]` + - Security: + - Always generates both TLS and plaintext for the chosen ports. +- This yields: + - No scheme + no port: 4 candidates (TLS/PLAIN × 9090/443) + - Any input with explicit port: 2 candidates (TLS/PLAIN on that port) + +Note: TLS creds use `credentials.NewClientTLSFromCert(nil, serverName)`; plaintext uses `insecure.NewCredentials()`. + +## Dialing and Readiness + +- `createGRPCConnection` uses `grpc.NewClient(target, opts...)` and then: + - `conn.Connect()` to begin dialing + - waits in a loop until: + - `Ready` → success; return the connection + - `Shutdown`/`TransientFailure` → close and return error + - idle/connecting → wait for state change with a per‑attempt timeout +- Timeouts: + - `dialReadyTimeout` (default 10s) applies if the provided context has no deadline. + +## Selecting the Winner + +- `newGRPCConnection` starts all candidate attempts and collects results on a buffered channel. +- The first `READY` result becomes the winner; we keep receiving remaining results to explicitly close any non‑winners (including late winners) to avoid leaks. +- All pending attempts are canceled via a parent context cancellation. + +## Blocking Behavior + +- The constructor returns only after a connection is in `READY` state. +- If all candidates fail, it returns an error (startup abort). + +## Keepalive for Long‑Lived Connections + +- Client keepalive parameters are tuned for Cosmos‑SDK environments: + - `keepaliveIdleTime = 10m` (send pings no more than every 10 minutes when idle) + - `keepaliveAckTimeout = 20s` + - `PermitWithoutStream = true` +- Rationale: + - Conservative ping interval avoids server `GOAWAY` for “too_many_pings”. + - Keeps NAT/firewalls from silently expiring idle connections. + +## Connection Monitor and Process Exit + +- `monitorConnection` watches the connection state: + - `Shutdown` → log and `os.Exit(1)` + - `TransientFailure` → allow up to `reconnectionGracePeriod = 30s` for recovery; if not recovered, log and exit. +- Normal reconnection behavior is handled by gRPC; the monitor only exits if the connection does not recover within the grace period or is shut down definitively. + +## Logging + +- On success, we log: target (`host:port`) and scheme (`tls` or `plaintext`). +- Errors during attempts surface as aggregated failure if no candidate succeeds. + +## Thread‑Safety + +- The only map (`seen`) is local to candidate generation and used single‑threaded. +- Concurrency is limited to goroutines dialing candidates and sending results through a channel; no shared maps are mutated concurrently. +- Late winner cleanup explicitly closes extra connections to avoid resource leaks. + +## Configuration Knobs (Constants) + +- `defaultDialPorts = [9090, 443]` +- `dialReadyTimeout = 10s` (per attempt, if no deadline present) +- `keepaliveIdleTime = 10m` +- `keepaliveAckTimeout = 20s` +- `reconnectionGracePeriod = 30s` + +## Extensibility + +- Ports: Adjust `defaultDialPorts` if your environment prefers a different port set. +- TLS: To support custom roots or mTLS, add an option to inject `TransportCredentials` instead of the defaults. +- Policies: If future schemes or resolvers are introduced, they can be layered in before candidate generation. + +## Error Cases and Behavior + +- If no candidate reaches `READY` within `dialReadyTimeout` per attempt, `newGRPCConnection` returns an error. +- If the connection later enters a prolonged `TransientFailure` or `Shutdown`, the monitor exits the process. + +## FAQ + +- Why try both TLS and plaintext? We avoid making assumptions based on scheme and instead race practical permutations to maximize robustness across deployments. +- Why include both 9090 and 443? These are common in gRPC deployments (custom service ports and TLS‑terminating LBs). Adjust as needed for your infra. +- Does this support Unix sockets? Not currently; could be added by extending candidate generation. + From 88880e8fddd4ce2d8249588af03d6ffa8b072bca Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Thu, 18 Sep 2025 03:44:11 +0500 Subject: [PATCH 3/3] latest-by-height + tests --- pkg/lumera/connection_test.go | 135 ++++++++++----------------- pkg/lumera/modules/supernode/impl.go | 58 +++++++----- sdk/adapters/lumera/adapter.go | 66 ++++++------- 3 files changed, 111 insertions(+), 148 deletions(-) diff --git a/pkg/lumera/connection_test.go b/pkg/lumera/connection_test.go index 8d5cfa5a..54282f52 100644 --- a/pkg/lumera/connection_test.go +++ b/pkg/lumera/connection_test.go @@ -2,97 +2,68 @@ package lumera import ( "testing" - "time" ) -func TestNormaliseAddr(t *testing.T) { - tests := []struct { - name string - input string - expectedHost string - expectedTLS bool - expectedServer string - expectError bool +func TestGenerateCandidates(t *testing.T) { + cases := []struct { + name string + input string + wantTLS int // count + wantPlain int // count + wantErr bool }{ - { - name: "https scheme", - input: "https://grpc.testnet.lumera.io", - expectedHost: "grpc.testnet.lumera.io:443", - expectedTLS: true, - expectedServer: "grpc.testnet.lumera.io", - expectError: false, - }, - { - name: "grpcs scheme with port", - input: "grpcs://grpc.node9x.com:7443", - expectedHost: "grpc.node9x.com:7443", - expectedTLS: true, - expectedServer: "grpc.node9x.com", - expectError: false, - }, - { - name: "host with port 443", - input: "grpc.node9x.com:443", - expectedHost: "grpc.node9x.com:443", - expectedTLS: true, - expectedServer: "grpc.node9x.com", - expectError: false, - }, - { - name: "host with custom port", - input: "grpc.node9x.com:9090", - expectedHost: "grpc.node9x.com:9090", - expectedTLS: false, - expectedServer: "grpc.node9x.com", - expectError: false, - }, - { - name: "host without port", - input: "grpc.testnet.lumera.io", - expectedHost: "grpc.testnet.lumera.io:9090", - expectedTLS: false, - expectedServer: "grpc.testnet.lumera.io", - expectError: false, - }, - { - name: "invalid scheme", - input: "ftp://invalid.com", - expectedHost: "", - expectedTLS: false, - expectedServer: "", - expectError: true, - }, + {name: "https no port", input: "https://grpc.testnet.lumera.io", wantTLS: 2, wantPlain: 2}, + {name: "grpcs with port", input: "grpcs://grpc.node9x.com:7443", wantTLS: 1, wantPlain: 1}, + {name: "http no port", input: "http://example.com", wantTLS: 2, wantPlain: 2}, + {name: "no scheme no port", input: "grpc.testnet.lumera.io", wantTLS: 2, wantPlain: 2}, + {name: "no scheme explicit 443", input: "grpc.node9x.com:443", wantTLS: 1, wantPlain: 1}, + {name: "no scheme explicit 9090", input: "grpc.node9x.com:9090", wantTLS: 1, wantPlain: 1}, + {name: "unknown scheme still ok", input: "ftp://invalid.com", wantTLS: 2, wantPlain: 2}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - hostPort, useTLS, serverName, err := normaliseAddr(tt.input) - - if tt.expectError && err == nil { - t.Errorf("normaliseAddr(%s) expected error, got nil", tt.input) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + meta, err := parseAddrMeta(tc.input) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } return } - - if !tt.expectError && err != nil { - t.Errorf("normaliseAddr(%s) unexpected error: %v", tt.input, err) - return + if err != nil { + t.Fatalf("unexpected error: %v", err) } - - if !tt.expectError { - if hostPort != tt.expectedHost { - t.Errorf("normaliseAddr(%s) hostPort = %s, want %s", tt.input, hostPort, tt.expectedHost) - } - if useTLS != tt.expectedTLS { - t.Errorf("normaliseAddr(%s) useTLS = %v, want %v", tt.input, useTLS, tt.expectedTLS) + cands := generateCandidates(meta) + if len(cands) == 0 { + t.Fatalf("no candidates generated") + } + gotTLS, gotPlain := 0, 0 + seen := map[string]bool{} + for _, c := range cands { + if seen[c.target+"|"+boolToStr(c.useTLS)] { + t.Fatalf("duplicate candidate: %v tls=%v", c.target, c.useTLS) } - if serverName != tt.expectedServer { - t.Errorf("normaliseAddr(%s) serverName = %s, want %s", tt.input, serverName, tt.expectedServer) + seen[c.target+"|"+boolToStr(c.useTLS)] = true + if c.useTLS { + gotTLS++ + } else { + gotPlain++ } } + if gotTLS != tc.wantTLS || gotPlain != tc.wantPlain { + t.Fatalf("unexpected counts: got tls=%d plain=%d want tls=%d plain=%d", gotTLS, gotPlain, tc.wantTLS, tc.wantPlain) + } }) } } +func boolToStr(b bool) string { + if b { + return "1" + } + return "0" +} + func TestGrpcConnectionMethods(t *testing.T) { // Test with nil connection conn := &grpcConnection{conn: nil} @@ -110,14 +81,4 @@ func TestGrpcConnectionMethods(t *testing.T) { } } -func TestConnectionConstants(t *testing.T) { - // Test that our constants are reasonable - if keepaliveTime < 10*time.Second { - t.Errorf("keepaliveTime too short: %v", keepaliveTime) - } - - if keepaliveTimeout >= keepaliveTime { - t.Errorf("keepaliveTimeout should be less than keepaliveTime: %v >= %v", keepaliveTimeout, keepaliveTime) - } -} - +// no keepalive constants to test anymore diff --git a/pkg/lumera/modules/supernode/impl.go b/pkg/lumera/modules/supernode/impl.go index 26d47c47..d0b633a8 100644 --- a/pkg/lumera/modules/supernode/impl.go +++ b/pkg/lumera/modules/supernode/impl.go @@ -3,7 +3,6 @@ package supernode import ( "context" "fmt" - "sort" "github.com/LumeraProtocol/lumera/x/supernode/v1/types" "github.com/LumeraProtocol/supernode/v2/pkg/errors" @@ -82,14 +81,22 @@ func Exists(nodes []*types.SuperNode, snAccAddress string) bool { } func GetLatestIP(supernode *types.SuperNode) (string, error) { - if len(supernode.PrevIpAddresses) == 0 { + if supernode == nil || len(supernode.PrevIpAddresses) == 0 { return "", errors.Errorf("no ip history exists for the supernode") } - sort.Slice(supernode.PrevIpAddresses, func(i, j int) bool { - return supernode.PrevIpAddresses[i].GetHeight() > supernode.PrevIpAddresses[j].GetHeight() - }) - - return supernode.PrevIpAddresses[0].Address, nil + var latest *types.IPAddressHistory + for _, r := range supernode.PrevIpAddresses { + if r == nil { + continue + } + if latest == nil || r.GetHeight() > latest.GetHeight() { + latest = r + } + } + if latest == nil { + return "", errors.Errorf("no valid ip record in history") + } + return latest.Address, nil } // GetSupernodeWithLatestAddress gets a supernode by account address and returns comprehensive info @@ -99,22 +106,25 @@ func (m *module) GetSupernodeWithLatestAddress(ctx context.Context, address stri return nil, fmt.Errorf("failed to get supernode: %w", err) } - // Get latest IP address + // Get latest IP address by max height var latestAddress string - if len(supernode.PrevIpAddresses) > 0 { - sort.Slice(supernode.PrevIpAddresses, func(i, j int) bool { - return supernode.PrevIpAddresses[i].GetHeight() > supernode.PrevIpAddresses[j].GetHeight() - }) - latestAddress = supernode.PrevIpAddresses[0].Address + if addr, err := GetLatestIP(supernode); err == nil { + latestAddress = addr } - // Get latest state + // Get latest state by max height var currentState string - if len(supernode.States) > 0 { - sort.Slice(supernode.States, func(i, j int) bool { - return supernode.States[i].Height > supernode.States[j].Height - }) - currentState = supernode.States[0].State.String() + var latestState *types.SuperNodeStateRecord + for _, s := range supernode.States { + if s == nil { + continue + } + if latestState == nil || s.Height > latestState.Height { + latestState = s + } + } + if latestState != nil { + currentState = latestState.State.String() } return &SuperNodeInfo{ @@ -128,9 +138,9 @@ func (m *module) GetSupernodeWithLatestAddress(ctx context.Context, address stri // ListSuperNodes retrieves all supernodes func (m *module) ListSuperNodes(ctx context.Context) (*types.QueryListSuperNodesResponse, error) { - resp, err := m.client.ListSuperNodes(ctx, &types.QueryListSuperNodesRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to list supernodes: %w", err) - } - return resp, nil + resp, err := m.client.ListSuperNodes(ctx, &types.QueryListSuperNodesRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to list supernodes: %w", err) + } + return resp, nil } diff --git a/sdk/adapters/lumera/adapter.go b/sdk/adapters/lumera/adapter.go index 8fe7a1fb..36038937 100644 --- a/sdk/adapters/lumera/adapter.go +++ b/sdk/adapters/lumera/adapter.go @@ -3,7 +3,6 @@ package lumera import ( "context" "fmt" - "sort" "github.com/LumeraProtocol/supernode/v2/sdk/log" @@ -104,26 +103,15 @@ func (a *Adapter) GetSupernodeWithLatestAddress(ctx context.Context, address str return nil, fmt.Errorf("received nil response for supernode %s", address) } - // Sort PrevIpAddresses by height in descending order - sort.Slice(resp.PrevIpAddresses, func(i, j int) bool { - return resp.PrevIpAddresses[i].Height > resp.PrevIpAddresses[j].Height - }) - - // Sort States by height in descending order - sort.Slice(resp.States, func(i, j int) bool { - return resp.States[i].Height > resp.States[j].Height - }) - - // Extract latest address + // Determine latest address/state strictly by max height latestAddress := "" - if len(resp.PrevIpAddresses) > 0 { - latestAddress = resp.PrevIpAddresses[0].Address + if addr, err := getLatestIP(resp); err == nil { + latestAddress = addr } - // Extract current state currentState := "" - if len(resp.States) > 0 { - currentState = resp.States[0].State.String() + if st, err := getLatestState(resp); err == nil && st != nil { + currentState = st.State.String() } info := &SuperNodeInfo{ @@ -283,17 +271,19 @@ func getLatestState(supernode *sntypes.SuperNode) (*sntypes.SuperNodeStateRecord return nil, fmt.Errorf("no state history exists for the supernode") } - // Sort by height in descending order to get the latest first - sort.Slice(supernode.States, func(i, j int) bool { - return supernode.States[i].Height > supernode.States[j].Height - }) - - // Access the latest state safely - if supernode.States[0] == nil { - return nil, fmt.Errorf("latest state in history is nil") + var latest *sntypes.SuperNodeStateRecord + for _, s := range supernode.States { + if s == nil { + continue + } + if latest == nil || s.Height > latest.Height { + latest = s + } } - - return supernode.States[0], nil + if latest == nil { + return nil, fmt.Errorf("no valid state in history") + } + return latest, nil } func getLatestIP(supernode *sntypes.SuperNode) (string, error) { @@ -306,15 +296,17 @@ func getLatestIP(supernode *sntypes.SuperNode) (string, error) { return "", fmt.Errorf("no ip history exists for the supernode") } - // Sort by height in descending order to get the latest first - sort.Slice(supernode.PrevIpAddresses, func(i, j int) bool { - return supernode.PrevIpAddresses[i].Height > supernode.PrevIpAddresses[j].Height - }) - - // Access the latest IP address safely - if supernode.PrevIpAddresses[0] == nil { - return "", fmt.Errorf("latest IP address in history is nil") + var latest *sntypes.IPAddressHistory + for _, r := range supernode.PrevIpAddresses { + if r == nil { + continue + } + if latest == nil || r.Height > latest.Height { + latest = r + } } - - return supernode.PrevIpAddresses[0].Address, nil + if latest == nil { + return "", fmt.Errorf("no valid IP address in history") + } + return latest.Address, nil }