diff --git a/README.md b/README.md index daff1480..a59b270d 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,6 @@ p2p: lumera: grpc_addr: "localhost:9090" chain_id: "lumera" - timeout: 10 # RaptorQ Configuration raptorq: diff --git a/pkg/net/grpc/server/server.go b/pkg/net/grpc/server/server.go index 8246108d..5c189a77 100644 --- a/pkg/net/grpc/server/server.go +++ b/pkg/net/grpc/server/server.go @@ -13,15 +13,16 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/reflection" - "github.com/LumeraProtocol/supernode/pkg/log" "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/log" ) const ( - _ = iota + _ = iota KB int = 1 << (10 * iota) // 1024 MB // 1048576 GB // 1073741824 @@ -53,7 +54,7 @@ type Server struct { listeners []net.Listener mu sync.RWMutex done chan struct{} - builder ServerOptionBuilder + builder ServerOptionBuilder } // ServiceDesc wraps a gRPC service description and its implementation @@ -65,28 +66,28 @@ type ServiceDesc struct { // ServerOptions contains options for creating a new server type ServerOptions struct { // Connection parameters - MaxRecvMsgSize int // Maximum message size the server can receive (in bytes) - MaxSendMsgSize int // Maximum message size the server can send (in bytes) - InitialWindowSize int32 // Initial window size for stream flow control - InitialConnWindowSize int32 // Initial window size for connection flow control - + MaxRecvMsgSize int // Maximum message size the server can receive (in bytes) + MaxSendMsgSize int // Maximum message size the server can send (in bytes) + InitialWindowSize int32 // Initial window size for stream flow control + InitialConnWindowSize int32 // Initial window size for connection flow control + // Server parameters - MaxConcurrentStreams uint32 // Maximum number of concurrent streams per connection - GracefulShutdownTime time.Duration // Time to wait for graceful shutdown + MaxConcurrentStreams uint32 // Maximum number of concurrent streams per connection + GracefulShutdownTime time.Duration // Time to wait for graceful shutdown // Keepalive parameters - MaxConnectionIdle time.Duration // Maximum time a connection can be idle - MaxConnectionAge time.Duration // Maximum time a connection can exist - MaxConnectionAgeGrace time.Duration // Additional time to wait before forcefully closing - Time time.Duration // Time after which server pings client if there's no activity + MaxConnectionIdle time.Duration // Maximum time a connection can be idle + MaxConnectionAge time.Duration // Maximum time a connection can exist + MaxConnectionAgeGrace time.Duration // Additional time to wait before forcefully closing + Time time.Duration // Time after which server pings client if there's no activity Timeout time.Duration // Time to wait for ping ack before considering the connection dead MinTime time.Duration // Minimum time client should wait before sending pings - PermitWithoutStream bool // Allow pings even when there are no active streams + PermitWithoutStream bool // Allow pings even when there are no active streams // Additional options - NumServerWorkers uint32 // Number of server workers (0 means default) - WriteBufferSize int // Size of write buffer - ReadBufferSize int // Size of read buffer + NumServerWorkers uint32 // Number of server workers (0 means default) + WriteBufferSize int // Size of write buffer + ReadBufferSize int // Size of read buffer } // DefaultServerOptions returns default server options @@ -103,12 +104,12 @@ func DefaultServerOptions() *ServerOptions { MaxConnectionAge: 2 * time.Hour, MaxConnectionAgeGrace: 1 * time.Hour, Time: 1 * time.Hour, - Timeout: 30 * time.Minute, - MinTime: 5 * time.Minute, - PermitWithoutStream: true, + Timeout: 30 * time.Minute, + MinTime: 5 * time.Minute, + PermitWithoutStream: true, - WriteBufferSize: 32 * KB, - ReadBufferSize: 32 * KB, + WriteBufferSize: 32 * KB, + ReadBufferSize: 32 * KB, } } @@ -122,7 +123,7 @@ func (b *defaultServerOptionBuilder) buildKeepAliveParams(opts *ServerOptions) k MaxConnectionAge: opts.MaxConnectionAge, MaxConnectionAgeGrace: opts.MaxConnectionAgeGrace, Time: opts.Time, - Timeout: opts.Timeout, + Timeout: opts.Timeout, } } @@ -142,7 +143,7 @@ func NewServer(name string, creds credentials.TransportCredentials) *Server { services: make([]ServiceDesc, 0), listeners: make([]net.Listener, 0), done: make(chan struct{}), - builder: &defaultServerOptionBuilder{}, + builder: &defaultServerOptionBuilder{}, } } @@ -154,7 +155,7 @@ func NewServerWithBuilder(name string, creds credentials.TransportCredentials, b services: make([]ServiceDesc, 0), listeners: make([]net.Listener, 0), done: make(chan struct{}), - builder: builder, + builder: builder, } } @@ -231,6 +232,9 @@ func (s *Server) Serve(ctx context.Context, address string, opts *ServerOptions) } s.mu.RUnlock() + // Enable reflection + reflection.Register(s.server) + // Create listener lis, err := s.createListener(ctx, address) if err != nil { @@ -310,4 +314,4 @@ func (s *Server) Close() error { return fmt.Errorf("errors closing listeners: %v", errs) } return nil -} \ No newline at end of file +} diff --git a/supernode/config.yml b/supernode/config.yml index 19750a7d..90c7bed4 100644 --- a/supernode/config.yml +++ b/supernode/config.yml @@ -10,7 +10,6 @@ supernode: keyring: backend: "os" # Options: test, file, os dir: "keys" # Keyring directory in home folder - password: "keyring-password" # Only used for 'file' backend # P2P Network Configuration p2p: @@ -24,7 +23,6 @@ p2p: lumera: grpc_addr: "localhost:9090" chain_id: "lumera" - timeout: 10 # Connection timeout in seconds # RaptorQ Configuration raptorq: diff --git a/supernode/config/config.go b/supernode/config/config.go index 634f39a2..b61d3f06 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -18,9 +18,8 @@ type SupernodeConfig struct { } type KeyringConfig struct { - Backend string `yaml:"backend"` - Dir string `yaml:"dir"` - Password string `yaml:"password"` + Backend string `yaml:"backend"` + Dir string `yaml:"dir"` } type P2PConfig struct { @@ -34,7 +33,6 @@ type P2PConfig struct { type LumeraClientConfig struct { GRPCAddr string `yaml:"grpc_addr"` ChainID string `yaml:"chain_id"` - Timeout int `yaml:"timeout"` } type RaptorQConfig struct { @@ -53,10 +51,14 @@ type Config struct { } // GetFullPath returns the absolute path by combining base directory with relative path +// If the path is already absolute, it returns the path as-is func (c *Config) GetFullPath(relativePath string) string { if relativePath == "" { return c.BaseDir } + if filepath.IsAbs(relativePath) { + return relativePath + } return filepath.Join(c.BaseDir, relativePath) } diff --git a/supernode/node/supernode/server/server.go b/supernode/node/supernode/server/server.go index 45e3dbed..efe28924 100644 --- a/supernode/node/supernode/server/server.go +++ b/supernode/node/supernode/server/server.go @@ -11,16 +11,15 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/reflection" "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" "github.com/LumeraProtocol/supernode/pkg/errgroup" - "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" "github.com/LumeraProtocol/supernode/pkg/lumera" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" + grpcserver "github.com/LumeraProtocol/supernode/pkg/net/grpc/server" "github.com/cosmos/cosmos-sdk/crypto/keyring" ) @@ -34,7 +33,7 @@ type Server struct { services []service name string kr keyring.Keyring - grpcServer *grpc.Server + grpcServer *grpcserver.Server lumeraClient lumera.Client healthServer *health.Server } @@ -56,44 +55,25 @@ func (server *Server) Run(ctx context.Context) error { return fmt.Errorf("failed to setup gRPC server: %w", err) } + // Custom server options + opts := grpcserver.DefaultServerOptions() + + // Defaul ServerOptions needs to be updated to hanlde larger files + + // opts.GracefulShutdownTime = 60 * time.Second + for _, address := range addresses { addr := net.JoinHostPort(strings.TrimSpace(address), strconv.Itoa(server.config.Port)) address := addr // Create a new variable to avoid closure issues group.Go(func() error { - return server.listen(ctx, address) + return server.grpcServer.Serve(ctx, address, opts) }) } return group.Wait() } -func (server *Server) listen(ctx context.Context, address string) (err error) { - listen, err := net.Listen("tcp", address) - if err != nil { - return errors.Errorf("listen: %w", err).WithField("address", address) - } - - errCh := make(chan error, 1) - go func() { - defer errors.Recover(func(recErr error) { err = recErr }) - log.WithContext(ctx).Infof("gRPC server listening securely on %q", address) - if err := server.grpcServer.Serve(listen); err != nil { - errCh <- errors.Errorf("serve: %w", err).WithField("address", address) - } - }() - - select { - case <-ctx.Done(): - log.WithContext(ctx).Infof("Shutting down gRPC server at %q", address) - server.grpcServer.GracefulStop() - case err := <-errCh: - return err - } - - return nil -} - func (server *Server) setupGRPCServer() error { // Create server credentials serverCreds, err := ltc.NewServerCreds(<c.ServerOptions{ @@ -108,24 +88,20 @@ func (server *Server) setupGRPCServer() error { return fmt.Errorf("failed to create server credentials: %w", err) } - // Initialize the gRPC server with credentials (secure) - server.grpcServer = grpc.NewServer(grpc.Creds(serverCreds)) + // Create ltc server + server.grpcServer = grpcserver.NewServer(server.name, serverCreds) // Initialize and register the health server server.healthServer = health.NewServer() healthpb.RegisterHealthServer(server.grpcServer, server.healthServer) - // Register reflection service - reflection.Register(server.grpcServer) - // Set all services as serving server.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) - // Register all services and set their health status + // Register all services for _, service := range server.services { - serviceName := service.Desc().ServiceName server.grpcServer.RegisterService(service.Desc(), service) - server.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING) + server.healthServer.SetServingStatus(service.Desc().ServiceName, healthpb.HealthCheckResponse_SERVING) } return nil @@ -143,16 +119,15 @@ func (server *Server) Close() { if server.healthServer != nil { // Set all services to NOT_SERVING before shutdown server.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) - - // Allow a short time for health status to propagate for _, service := range server.services { serviceName := service.Desc().ServiceName server.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING) } } + // Wrapper handles all gRPC server cleanup if server.grpcServer != nil { - server.grpcServer.GracefulStop() + server.grpcServer.Close() } } @@ -163,10 +138,10 @@ func New(config *Config, name string, kr keyring.Keyring, lumeraClient lumera.Cl } return &Server{ - config: config, - services: services, - name: name, - kr: kr, + config: config, + services: services, + name: name, + kr: kr, lumeraClient: lumeraClient, }, nil }