Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ type Config struct {

// BootstrapNodes is ONLY used for integration testing to inject a Node's IP address
BootstrapNodes string `mapstructure:"bootstrap_nodes" json:"bootstrap_nodes,omitempty"`
// ExternalIP is ONLY used for integration testing to assign a fixed IP address
ExternalIP string `mapstructure:"external_ip" json:"external_ip,omitempty"`

// ID of supernode to be used in P2P - Supposed to be the PastelID
ID string `mapstructure:"id" json:"-"`
Expand Down
6 changes: 3 additions & 3 deletions p2p/kademlia/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
return s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes)
}

selfAddress, err := s.getExternalIP()
supernodeAddr, err := s.getSupernodeAddress(ctx)
if err != nil {
return fmt.Errorf("get external ip addr: %s", err)
return fmt.Errorf("get supernode address: %s", err)
}
selfAddress = fmt.Sprintf("%s:%d", selfAddress, s.options.Port)
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)

var boostrapNodes []*Node

Expand Down
58 changes: 35 additions & 23 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/hex"
"fmt"
"math"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -55,7 +55,7 @@ type DHT struct {
done chan struct{} // distributed hash table is done
cache storage.KeyValue // store bad bootstrap addresses
bsConnected map[string]bool // map of connected bootstrap nodes [identity] -> connected
externalIP string
supernodeAddr string // cached address from chain
mtx sync.Mutex
ignorelist *BanList
replicationMtx sync.RWMutex
Expand All @@ -81,8 +81,6 @@ type Options struct {

// Keyring for credentials
Keyring keyring.Keyring

ExternalIP string
}

// NewDHT returns a new DHT node
Expand All @@ -107,10 +105,6 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Opti
rqstore: rqstore,
}

if options.ExternalIP != "" {
s.externalIP = options.ExternalIP
}

// Check that keyring is provided
if options.Keyring == nil {
return nil, fmt.Errorf("keyring is required but not provided")
Expand Down Expand Up @@ -166,28 +160,42 @@ func (s *DHT) NodesLen() int {
return len(s.ht.nodes())
}

func (s *DHT) getExternalIP() (string, error) {
func (s *DHT) getSupernodeAddress(ctx context.Context) (string, error) {
s.mtx.Lock()
defer s.mtx.Unlock()

// Return cached value if already determined
if s.externalIP != "" {
return s.externalIP, nil
if s.supernodeAddr != "" {
return s.supernodeAddr, nil
}

// Check environment variable to control IP behavior
if useExternal := os.Getenv("P2P_USE_EXTERNAL_IP"); useExternal == "false" || useExternal == "0" {
s.externalIP = s.ht.self.IP
return s.externalIP, nil
// Query chain for supernode info
supernodeInfo, err := s.options.LumeraClient.SuperNode().GetSupernodeWithLatestAddress(ctx, string(s.options.ID))
if err != nil || supernodeInfo == nil {
// Fallback to local IP if chain query fails
s.supernodeAddr = s.ht.self.IP
return s.supernodeAddr, nil
}

externalIP, err := utils.GetExternalIPAddress()
if err != nil {
return "", fmt.Errorf("get external ip addr: %s", err)
s.supernodeAddr = supernodeInfo.LatestAddress
return supernodeInfo.LatestAddress, nil
}

// parseSupernodeAddress extracts the host from various address formats
func parseSupernodeAddress(address string) string {
// Remove protocol prefixes
if strings.HasPrefix(address, "https://") {
address = strings.TrimPrefix(address, "https://")
} else if strings.HasPrefix(address, "http://") {
address = strings.TrimPrefix(address, "http://")
}

// Extract host part (remove port if present)
if idx := strings.LastIndex(address, ":"); idx != -1 {
return address[:idx]
}

s.externalIP = externalIP
return externalIP, nil
return address
}

// Start the distributed hash table
Expand Down Expand Up @@ -365,9 +373,11 @@ func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) {

// new a message
func (s *DHT) newMessage(messageType int, receiver *Node, data interface{}) *Message {
externalIP, _ := s.getExternalIP()
ctx := context.Background()
supernodeAddr, _ := s.getSupernodeAddress(ctx)
hostIP := parseSupernodeAddress(supernodeAddr)
sender := &Node{
IP: externalIP,
IP: hostIP,
ID: s.ht.self.ID,
Port: s.ht.self.Port,
}
Expand Down Expand Up @@ -574,7 +584,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
result[key] = nil
}

self := &Node{ID: s.ht.self.ID, IP: s.externalIP, Port: s.ht.self.Port}
supernodeAddr, _ := s.getSupernodeAddress(ctx)
hostIP := parseSupernodeAddress(supernodeAddr)
self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port}
self.SetHashedID()

// populate hexKeys and hashes
Expand Down
4 changes: 3 additions & 1 deletion p2p/kademlia/redundant_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (s *DHT) cleanupRedundantDataWorker(ctx context.Context) {
replicationKeys := s.store.GetKeysForReplication(ctx, from, to)

ignores := s.ignorelist.ToNodeList()
self := &Node{ID: s.ht.self.ID, IP: s.externalIP, Port: s.ht.self.Port}
supernodeAddr, _ := s.getSupernodeAddress(ctx)
hostIP := parseSupernodeAddress(supernodeAddr)
self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port}
self.SetHashedID()

closestContactsMap := make(map[string][][]byte)
Expand Down
4 changes: 3 additions & 1 deletion p2p/kademlia/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func (s *DHT) Replicate(ctx context.Context) {
ignores := s.ignorelist.ToNodeList()
closestContactsMap := make(map[string][][]byte)

self := &Node{ID: s.ht.self.ID, IP: s.externalIP, Port: s.ht.self.Port}
supernodeAddr, _ := s.getSupernodeAddress(ctx)
hostIP := parseSupernodeAddress(supernodeAddr)
self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port}
self.SetHashedID()

for i := 0; i < len(replicationKeys); i++ {
Expand Down
2 changes: 1 addition & 1 deletion p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *Store) StoreBatchRepKeys(values []string, id string, ip string, port ui
func (s *Store) GetKeysForReplication(ctx context.Context, from time.Time, to time.Time) domain.KeysWithTimestamp {
var results []domain.KeyWithTimestamp
query := `SELECT key, createdAt FROM data WHERE createdAt > ? AND createdAt < ? ORDER BY createdAt ASC`

logtrace.Debug(ctx, "fetching keys for replication", logtrace.Fields{
logtrace.FieldModule: "p2p",
"from_time": from,
Expand Down
4 changes: 0 additions & 4 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,6 @@ func (s *p2p) configure(ctx context.Context) error {
errors.Errorf("node id is empty")
}

// We Set ExternalIP only for integration tests
if s.config.BootstrapNodes != "" && s.config.ExternalIP != "" {
kadOpts.ExternalIP = s.config.ExternalIP
}
// new a kademlia distributed hash table
dht, err := kademlia.NewDHT(ctx, s.store, s.metaStore, kadOpts, s.rqstore)

Expand Down
34 changes: 34 additions & 0 deletions pkg/lumera/modules/supernode/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,37 @@ func GetLatestIP(supernode *types.SuperNode) (string, error) {

return supernode.PrevIpAddresses[0].Address, nil
}

// GetSupernodeWithLatestAddress gets a supernode by account address and returns comprehensive info
func (m *module) GetSupernodeWithLatestAddress(ctx context.Context, address string) (*SuperNodeInfo, error) {
supernode, err := m.GetSupernodeBySupernodeAddress(ctx, address)
if err != nil {
return nil, fmt.Errorf("failed to get supernode: %w", err)
}

// Get latest IP address
var latestAddress string
if len(supernode.PrevIpAddresses) > 0 {
sort.Slice(supernode.PrevIpAddresses, func(i, j int) bool {
return supernode.PrevIpAddresses[i].GetHeight() > supernode.PrevIpAddresses[j].GetHeight()
})
latestAddress = supernode.PrevIpAddresses[0].Address
}

// Get latest state
var currentState string
if len(supernode.States) > 0 {
sort.Slice(supernode.States, func(i, j int) bool {
return supernode.States[i].Height > supernode.States[j].Height
})
currentState = supernode.States[0].State.String()
}

return &SuperNodeInfo{
SupernodeAccount: supernode.SupernodeAccount,
ValidatorAddress: supernode.ValidatorAddress,
P2PPort: supernode.P2PPort,
LatestAddress: latestAddress,
CurrentState: currentState,
}, nil
}
10 changes: 10 additions & 0 deletions pkg/lumera/modules/supernode/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,21 @@ import (
"google.golang.org/grpc"
)

// SuperNodeInfo contains processed supernode information with latest state and address
type SuperNodeInfo struct {
SupernodeAccount string `json:"supernode_account"`
ValidatorAddress string `json:"validator_address"`
P2PPort string `json:"p2p_port"`
LatestAddress string `json:"latest_address"`
CurrentState string `json:"current_state"`
}

// Module defines the interface for interacting with the supernode module
type Module interface {
GetTopSuperNodesForBlock(ctx context.Context, blockHeight uint64) (*types.QueryGetTopSuperNodesForBlockResponse, error)
GetSuperNode(ctx context.Context, address string) (*types.QueryGetSuperNodeResponse, error)
GetSupernodeBySupernodeAddress(ctx context.Context, address string) (*types.SuperNode, error)
GetSupernodeWithLatestAddress(ctx context.Context, address string) (*SuperNodeInfo, error)
GetParams(ctx context.Context) (*types.QueryParamsResponse, error)
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/lumera/modules/supernode/supernode_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/testutil/lumera.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ func (m *MockSupernodeModule) GetParams(ctx context.Context) (*supernodeTypes.Qu
return &supernodeTypes.QueryParamsResponse{}, nil
}

func (m *MockSupernodeModule) GetSupernodeWithLatestAddress(ctx context.Context, address string) (*supernode.SuperNodeInfo, error) {
return &supernode.SuperNodeInfo{
SupernodeAccount: address,
ValidatorAddress: "validator_" + address,
P2PPort: "4445",
LatestAddress: "127.0.0.1:9000",
CurrentState: "SUPERNODE_STATE_ACTIVE",
}, nil
}

// MockTxModule implements the tx.Module interface for testing
type MockTxModule struct{}

Expand Down
57 changes: 0 additions & 57 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"math"
"math/big"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -38,41 +37,6 @@ const (
highCompressionLevel = 4
)

var ipEndpoints = []string{
"https://api.ipify.org",
"https://ifconfig.co/ip",
"https://checkip.amazonaws.com",
"https://ipv4.icanhazip.com",
}

// GetExternalIPAddress returns the first valid public IP obtained
// from a list of providers, or an error if none work.
// func GetExternalIPAddress() (string, error) {
// client := &http.Client{Timeout: 4 * time.Second}

// for _, url := range ipEndpoints {
// req, _ := http.NewRequest(http.MethodGet, url, nil)

// resp, err := client.Do(req)
// if err != nil {
// continue // provider down? try next
// }

// body, err := io.ReadAll(resp.Body)
// resp.Body.Close()
// if err != nil {
// continue
// }

// ip := strings.TrimSpace(string(body))
// if net.ParseIP(ip) != nil {
// return ip, nil
// }
// }

// return "", errors.New("unable to determine external IP address from any provider")
// }

var sem = semaphore.NewWeighted(maxParallelHighCompressCalls)

// DiskStatus cotains info of disk storage
Expand Down Expand Up @@ -133,27 +97,6 @@ func IsContextErr(err error) bool {
return false
}

// GetExternalIPAddress returns external IP address
func GetExternalIPAddress() (externalIP string, err error) {
resp, err := http.Get("https://api.ipify.org")
if err != nil {
return "", err
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

if net.ParseIP(string(body)) == nil {
return "", errors.Errorf("invalid IP response from %s", "api.ipify.org")
}

return string(body), nil
}

// B64Encode base64 encodes
func B64Encode(in []byte) (out []byte) {
out = make([]byte, base64.StdEncoding.EncodedLen(len(in)))
Expand Down
17 changes: 5 additions & 12 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,17 @@ func (c *ClientImpl) GetSupernodeStatus(ctx context.Context, supernodeAddress st

c.logger.Debug(ctx, "Getting supernode status", "address", supernodeAddress)

// Get supernode details from blockchain
supernode, err := c.lumeraClient.GetSupernodeBySupernodeAddress(ctx, supernodeAddress)
// Get supernode info including latest address
supernodeInfo, err := c.lumeraClient.GetSupernodeWithLatestAddress(ctx, supernodeAddress)
if err != nil {
c.logger.Error(ctx, "Failed to get supernode details", "address", supernodeAddress, "error", err)
return nil, fmt.Errorf("failed to get supernode details: %w", err)
c.logger.Error(ctx, "Failed to get supernode info", "address", supernodeAddress, "error", err)
return nil, fmt.Errorf("failed to get supernode info: %w", err)
}

// Get the latest IP address for the supernode
if len(supernode.PrevIpAddresses) == 0 {
return nil, fmt.Errorf("no IP addresses found for supernode %s", supernodeAddress)
}

ipAddress := supernode.PrevIpAddresses[0].Address

// Create lumera supernode object for network client
lumeraSupernode := lumera.Supernode{
CosmosAddress: supernodeAddress,
GrpcEndpoint: ipAddress,
GrpcEndpoint: supernodeInfo.LatestAddress,
State: lumera.SUPERNODE_STATE_ACTIVE, // Assume active since we're querying
}

Expand Down
Loading