Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ p2p:
lumera:
grpc_addr: "localhost:9090"
chain_id: "lumera"
timeout: 10

# RaptorQ Configuration
raptorq:
Expand Down
60 changes: 32 additions & 28 deletions pkg/net/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"

"github.com/LumeraProtocol/supernode/pkg/log"
"github.com/LumeraProtocol/supernode/pkg/errors"
"github.com/LumeraProtocol/supernode/pkg/log"
)

const (
_ = iota
_ = iota
KB int = 1 << (10 * iota) // 1024
MB // 1048576
GB // 1073741824
Expand Down Expand Up @@ -53,7 +54,7 @@ type Server struct {
listeners []net.Listener
mu sync.RWMutex
done chan struct{}
builder ServerOptionBuilder
builder ServerOptionBuilder
}

// ServiceDesc wraps a gRPC service description and its implementation
Expand All @@ -65,28 +66,28 @@ type ServiceDesc struct {
// ServerOptions contains options for creating a new server
type ServerOptions struct {
// Connection parameters
MaxRecvMsgSize int // Maximum message size the server can receive (in bytes)
MaxSendMsgSize int // Maximum message size the server can send (in bytes)
InitialWindowSize int32 // Initial window size for stream flow control
InitialConnWindowSize int32 // Initial window size for connection flow control
MaxRecvMsgSize int // Maximum message size the server can receive (in bytes)
MaxSendMsgSize int // Maximum message size the server can send (in bytes)
InitialWindowSize int32 // Initial window size for stream flow control
InitialConnWindowSize int32 // Initial window size for connection flow control

// Server parameters
MaxConcurrentStreams uint32 // Maximum number of concurrent streams per connection
GracefulShutdownTime time.Duration // Time to wait for graceful shutdown
MaxConcurrentStreams uint32 // Maximum number of concurrent streams per connection
GracefulShutdownTime time.Duration // Time to wait for graceful shutdown

// Keepalive parameters
MaxConnectionIdle time.Duration // Maximum time a connection can be idle
MaxConnectionAge time.Duration // Maximum time a connection can exist
MaxConnectionAgeGrace time.Duration // Additional time to wait before forcefully closing
Time time.Duration // Time after which server pings client if there's no activity
MaxConnectionIdle time.Duration // Maximum time a connection can be idle
MaxConnectionAge time.Duration // Maximum time a connection can exist
MaxConnectionAgeGrace time.Duration // Additional time to wait before forcefully closing
Time time.Duration // Time after which server pings client if there's no activity
Timeout time.Duration // Time to wait for ping ack before considering the connection dead
MinTime time.Duration // Minimum time client should wait before sending pings
PermitWithoutStream bool // Allow pings even when there are no active streams
PermitWithoutStream bool // Allow pings even when there are no active streams

// Additional options
NumServerWorkers uint32 // Number of server workers (0 means default)
WriteBufferSize int // Size of write buffer
ReadBufferSize int // Size of read buffer
NumServerWorkers uint32 // Number of server workers (0 means default)
WriteBufferSize int // Size of write buffer
ReadBufferSize int // Size of read buffer
}

// DefaultServerOptions returns default server options
Expand All @@ -103,12 +104,12 @@ func DefaultServerOptions() *ServerOptions {
MaxConnectionAge: 2 * time.Hour,
MaxConnectionAgeGrace: 1 * time.Hour,
Time: 1 * time.Hour,
Timeout: 30 * time.Minute,
MinTime: 5 * time.Minute,
PermitWithoutStream: true,
Timeout: 30 * time.Minute,
MinTime: 5 * time.Minute,
PermitWithoutStream: true,

WriteBufferSize: 32 * KB,
ReadBufferSize: 32 * KB,
WriteBufferSize: 32 * KB,
ReadBufferSize: 32 * KB,
}
}

Expand All @@ -122,7 +123,7 @@ func (b *defaultServerOptionBuilder) buildKeepAliveParams(opts *ServerOptions) k
MaxConnectionAge: opts.MaxConnectionAge,
MaxConnectionAgeGrace: opts.MaxConnectionAgeGrace,
Time: opts.Time,
Timeout: opts.Timeout,
Timeout: opts.Timeout,
}
}

Expand All @@ -142,7 +143,7 @@ func NewServer(name string, creds credentials.TransportCredentials) *Server {
services: make([]ServiceDesc, 0),
listeners: make([]net.Listener, 0),
done: make(chan struct{}),
builder: &defaultServerOptionBuilder{},
builder: &defaultServerOptionBuilder{},
}
}

Expand All @@ -154,7 +155,7 @@ func NewServerWithBuilder(name string, creds credentials.TransportCredentials, b
services: make([]ServiceDesc, 0),
listeners: make([]net.Listener, 0),
done: make(chan struct{}),
builder: builder,
builder: builder,
}
}

Expand Down Expand Up @@ -231,6 +232,9 @@ func (s *Server) Serve(ctx context.Context, address string, opts *ServerOptions)
}
s.mu.RUnlock()

// Enable reflection
reflection.Register(s.server)

// Create listener
lis, err := s.createListener(ctx, address)
if err != nil {
Expand Down Expand Up @@ -310,4 +314,4 @@ func (s *Server) Close() error {
return fmt.Errorf("errors closing listeners: %v", errs)
}
return nil
}
}
2 changes: 0 additions & 2 deletions supernode/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ supernode:
keyring:
backend: "os" # Options: test, file, os
dir: "keys" # Keyring directory in home folder
password: "keyring-password" # Only used for 'file' backend

# P2P Network Configuration
p2p:
Expand All @@ -24,7 +23,6 @@ p2p:
lumera:
grpc_addr: "localhost:9090"
chain_id: "lumera"
timeout: 10 # Connection timeout in seconds

# RaptorQ Configuration
raptorq:
Expand Down
10 changes: 6 additions & 4 deletions supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ type SupernodeConfig struct {
}

type KeyringConfig struct {
Backend string `yaml:"backend"`
Dir string `yaml:"dir"`
Password string `yaml:"password"`
Backend string `yaml:"backend"`
Dir string `yaml:"dir"`
}

type P2PConfig struct {
Expand All @@ -34,7 +33,6 @@ type P2PConfig struct {
type LumeraClientConfig struct {
GRPCAddr string `yaml:"grpc_addr"`
ChainID string `yaml:"chain_id"`
Timeout int `yaml:"timeout"`
}

type RaptorQConfig struct {
Expand All @@ -53,10 +51,14 @@ type Config struct {
}

// GetFullPath returns the absolute path by combining base directory with relative path
// If the path is already absolute, it returns the path as-is
func (c *Config) GetFullPath(relativePath string) string {
if relativePath == "" {
return c.BaseDir
}
if filepath.IsAbs(relativePath) {
return relativePath
}
return filepath.Join(c.BaseDir, relativePath)
}

Expand Down
65 changes: 20 additions & 45 deletions supernode/node/supernode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

"github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx"
"github.com/LumeraProtocol/supernode/pkg/errgroup"
"github.com/LumeraProtocol/supernode/pkg/errors"
"github.com/LumeraProtocol/supernode/pkg/log"
"github.com/LumeraProtocol/supernode/pkg/lumera"

ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials"
"github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn"
grpcserver "github.com/LumeraProtocol/supernode/pkg/net/grpc/server"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
)

Expand All @@ -34,7 +33,7 @@ type Server struct {
services []service
name string
kr keyring.Keyring
grpcServer *grpc.Server
grpcServer *grpcserver.Server
lumeraClient lumera.Client
healthServer *health.Server
}
Expand All @@ -56,44 +55,25 @@ func (server *Server) Run(ctx context.Context) error {
return fmt.Errorf("failed to setup gRPC server: %w", err)
}

// Custom server options
opts := grpcserver.DefaultServerOptions()

// Defaul ServerOptions needs to be updated to hanlde larger files

// opts.GracefulShutdownTime = 60 * time.Second

for _, address := range addresses {
addr := net.JoinHostPort(strings.TrimSpace(address), strconv.Itoa(server.config.Port))
address := addr // Create a new variable to avoid closure issues

group.Go(func() error {
return server.listen(ctx, address)
return server.grpcServer.Serve(ctx, address, opts)
})
}

return group.Wait()
}

func (server *Server) listen(ctx context.Context, address string) (err error) {
listen, err := net.Listen("tcp", address)
if err != nil {
return errors.Errorf("listen: %w", err).WithField("address", address)
}

errCh := make(chan error, 1)
go func() {
defer errors.Recover(func(recErr error) { err = recErr })
log.WithContext(ctx).Infof("gRPC server listening securely on %q", address)
if err := server.grpcServer.Serve(listen); err != nil {
errCh <- errors.Errorf("serve: %w", err).WithField("address", address)
}
}()

select {
case <-ctx.Done():
log.WithContext(ctx).Infof("Shutting down gRPC server at %q", address)
server.grpcServer.GracefulStop()
case err := <-errCh:
return err
}

return nil
}

func (server *Server) setupGRPCServer() error {
// Create server credentials
serverCreds, err := ltc.NewServerCreds(&ltc.ServerOptions{
Expand All @@ -108,24 +88,20 @@ func (server *Server) setupGRPCServer() error {
return fmt.Errorf("failed to create server credentials: %w", err)
}

// Initialize the gRPC server with credentials (secure)
server.grpcServer = grpc.NewServer(grpc.Creds(serverCreds))
// Create ltc server
server.grpcServer = grpcserver.NewServer(server.name, serverCreds)

// Initialize and register the health server
server.healthServer = health.NewServer()
healthpb.RegisterHealthServer(server.grpcServer, server.healthServer)

// Register reflection service
reflection.Register(server.grpcServer)

// Set all services as serving
server.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)

// Register all services and set their health status
// Register all services
for _, service := range server.services {
serviceName := service.Desc().ServiceName
server.grpcServer.RegisterService(service.Desc(), service)
server.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)
server.healthServer.SetServingStatus(service.Desc().ServiceName, healthpb.HealthCheckResponse_SERVING)
}

return nil
Expand All @@ -143,16 +119,15 @@ func (server *Server) Close() {
if server.healthServer != nil {
// Set all services to NOT_SERVING before shutdown
server.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)

// Allow a short time for health status to propagate
for _, service := range server.services {
serviceName := service.Desc().ServiceName
server.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)
}
}

// Wrapper handles all gRPC server cleanup
if server.grpcServer != nil {
server.grpcServer.GracefulStop()
server.grpcServer.Close()
}
}

Expand All @@ -163,10 +138,10 @@ func New(config *Config, name string, kr keyring.Keyring, lumeraClient lumera.Cl
}

return &Server{
config: config,
services: services,
name: name,
kr: kr,
config: config,
services: services,
name: name,
kr: kr,
lumeraClient: lumeraClient,
}, nil
}