From 57db4d95244499ffbf75968b09775a0a718f355f Mon Sep 17 00:00:00 2001 From: j-rafique Date: Thu, 3 Jul 2025 19:52:19 +0500 Subject: [PATCH 1/2] feat: Implement SupernodeService for system status reporting - Added gRPC service SupernodeService with GetStatus method to provide system status. - Created StatusRequest and StatusResponse messages to encapsulate request and response data. - Integrated CPU and memory metrics retrieval using gopsutil library in SupernodeStatusService. - Removed HealthCheck RPC from CascadeService and replaced it with a centralized status service. - Updated CascadeAdapter to utilize the new SupernodeService for status checks. - Implemented unit tests for SupernodeServer and SupernodeStatusService to ensure correct functionality. - Refactored existing health check logic into common service for better reusability. --- Makefile | 9 + gen/supernode/action/cascade/service.pb.go | 374 +++------------- .../action/cascade/service_grpc.pb.go | 47 +- gen/supernode/supernode.pb.go | 420 ++++++++++++++++++ gen/supernode/supernode_grpc.pb.go | 125 ++++++ proto/supernode/action/cascade/service.proto | 21 - proto/supernode/supernode.proto | 37 ++ sdk/adapters/supernodeservice/adapter.go | 27 +- sdk/net/impl.go | 3 +- supernode/cmd/start.go | 6 + .../server/cascade/cascade_action_server.go | 23 - .../node/supernode/server/status_server.go | 65 +++ .../supernode/server/status_server_test.go | 80 ++++ supernode/services/cascade/healthcheck.go | 66 +-- .../services/cascade/healthcheck_test.go | 24 +- supernode/services/cascade/service.go | 14 + supernode/services/common/supernode_status.go | 125 ++++++ .../services/common/supernode_status_test.go | 118 +++++ supernode/services/common/test_helpers.go | 15 + 19 files changed, 1126 insertions(+), 473 deletions(-) create mode 100644 gen/supernode/supernode.pb.go create mode 100644 gen/supernode/supernode_grpc.pb.go create mode 100644 proto/supernode/supernode.proto create mode 100644 supernode/node/supernode/server/status_server.go create mode 100644 supernode/node/supernode/server/status_server_test.go create mode 100644 supernode/services/common/supernode_status.go create mode 100644 supernode/services/common/supernode_status_test.go create mode 100644 supernode/services/common/test_helpers.go diff --git a/Makefile b/Makefile index fe4536d9..362531f4 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,15 @@ gen-cascade: --go-grpc_opt=paths=source_relative \ proto/supernode/action/cascade/service.proto +gen-supernode: + protoc \ + --proto_path=proto \ + --go_out=gen \ + --go_opt=paths=source_relative \ + --go-grpc_out=gen \ + --go-grpc_opt=paths=source_relative \ + proto/supernode/supernode.proto + # Define the paths SUPERNODE_SRC=supernode/main.go DATA_DIR=tests/system/supernode-data1 diff --git a/gen/supernode/action/cascade/service.pb.go b/gen/supernode/action/cascade/service.pb.go index d084e367..ded0dd47 100644 --- a/gen/supernode/action/cascade/service.pb.go +++ b/gen/supernode/action/cascade/service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.35.2 -// protoc v3.12.4 +// protoc v3.21.12 // source: supernode/action/cascade/service.proto package cascade @@ -514,226 +514,6 @@ func (x *DownloadEvent) GetMessage() string { return "" } -type HealthCheckRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *HealthCheckRequest) Reset() { - *x = HealthCheckRequest{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *HealthCheckRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckRequest) ProtoMessage() {} - -func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[7] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. -func (*HealthCheckRequest) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{7} -} - -// The HealthCheckResponse represents system health status. -type HealthCheckResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Cpu *HealthCheckResponse_CPU `protobuf:"bytes,1,opt,name=cpu,proto3" json:"cpu,omitempty"` - Memory *HealthCheckResponse_Memory `protobuf:"bytes,2,opt,name=memory,proto3" json:"memory,omitempty"` - TasksInProgress []string `protobuf:"bytes,3,rep,name=tasks_in_progress,json=tasksInProgress,proto3" json:"tasks_in_progress,omitempty"` -} - -func (x *HealthCheckResponse) Reset() { - *x = HealthCheckResponse{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *HealthCheckResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckResponse) ProtoMessage() {} - -func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[8] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. -func (*HealthCheckResponse) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8} -} - -func (x *HealthCheckResponse) GetCpu() *HealthCheckResponse_CPU { - if x != nil { - return x.Cpu - } - return nil -} - -func (x *HealthCheckResponse) GetMemory() *HealthCheckResponse_Memory { - if x != nil { - return x.Memory - } - return nil -} - -func (x *HealthCheckResponse) GetTasksInProgress() []string { - if x != nil { - return x.TasksInProgress - } - return nil -} - -type HealthCheckResponse_CPU struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Usage string `protobuf:"bytes,1,opt,name=usage,proto3" json:"usage,omitempty"` - Remaining string `protobuf:"bytes,2,opt,name=remaining,proto3" json:"remaining,omitempty"` -} - -func (x *HealthCheckResponse_CPU) Reset() { - *x = HealthCheckResponse_CPU{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *HealthCheckResponse_CPU) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckResponse_CPU) ProtoMessage() {} - -func (x *HealthCheckResponse_CPU) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[9] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckResponse_CPU.ProtoReflect.Descriptor instead. -func (*HealthCheckResponse_CPU) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8, 0} -} - -func (x *HealthCheckResponse_CPU) GetUsage() string { - if x != nil { - return x.Usage - } - return "" -} - -func (x *HealthCheckResponse_CPU) GetRemaining() string { - if x != nil { - return x.Remaining - } - return "" -} - -type HealthCheckResponse_Memory struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"` - Used uint64 `protobuf:"varint,2,opt,name=used,proto3" json:"used,omitempty"` - Available uint64 `protobuf:"varint,3,opt,name=available,proto3" json:"available,omitempty"` - UsedPerc float64 `protobuf:"fixed64,4,opt,name=used_perc,json=usedPerc,proto3" json:"used_perc,omitempty"` -} - -func (x *HealthCheckResponse_Memory) Reset() { - *x = HealthCheckResponse_Memory{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *HealthCheckResponse_Memory) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckResponse_Memory) ProtoMessage() {} - -func (x *HealthCheckResponse_Memory) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[10] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckResponse_Memory.ProtoReflect.Descriptor instead. -func (*HealthCheckResponse_Memory) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8, 1} -} - -func (x *HealthCheckResponse_Memory) GetTotal() uint64 { - if x != nil { - return x.Total - } - return 0 -} - -func (x *HealthCheckResponse_Memory) GetUsed() uint64 { - if x != nil { - return x.Used - } - return 0 -} - -func (x *HealthCheckResponse_Memory) GetAvailable() uint64 { - if x != nil { - return x.Available - } - return 0 -} - -func (x *HealthCheckResponse_Memory) GetUsedPerc() float64 { - if x != nil { - return x.UsedPerc - } - return 0 -} - var File_supernode_action_cascade_service_proto protoreflect.FileDescriptor var file_supernode_action_cascade_service_proto_rawDesc = []byte{ @@ -780,68 +560,40 @@ var file_supernode_action_cascade_service_proto_rawDesc = []byte{ 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x22, 0x14, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xdc, 0x02, 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, - 0x0a, 0x03, 0x63, 0x70, 0x75, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x63, 0x61, - 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x50, 0x55, 0x52, 0x03, 0x63, - 0x70, 0x75, 0x12, 0x3b, 0x0a, 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, - 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x2e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, - 0x2a, 0x0a, 0x11, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x5f, 0x69, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x67, - 0x72, 0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x61, 0x73, 0x6b, - 0x73, 0x49, 0x6e, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x39, 0x0a, 0x03, 0x43, - 0x50, 0x55, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x61, - 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, - 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x1a, 0x6d, 0x0a, 0x06, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, - 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x75, 0x73, 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x76, - 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x61, - 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65, 0x64, - 0x5f, 0x70, 0x65, 0x72, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x08, 0x75, 0x73, 0x65, - 0x64, 0x50, 0x65, 0x72, 0x63, 0x2a, 0xb6, 0x02, 0x0a, 0x12, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, - 0x6f, 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, - 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, - 0x49, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x49, 0x45, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, - 0x17, 0x0a, 0x13, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x45, 0x45, 0x5f, 0x56, 0x45, - 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x54, 0x4f, 0x50, 0x5f, - 0x53, 0x55, 0x50, 0x45, 0x52, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, - 0x50, 0x41, 0x53, 0x53, 0x45, 0x44, 0x10, 0x03, 0x12, 0x14, 0x0a, 0x10, 0x4d, 0x45, 0x54, 0x41, - 0x44, 0x41, 0x54, 0x41, 0x5f, 0x44, 0x45, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x04, 0x12, 0x16, - 0x0a, 0x12, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x48, 0x41, 0x53, 0x48, 0x5f, 0x56, 0x45, 0x52, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49, 0x4e, 0x50, 0x55, 0x54, 0x5f, - 0x45, 0x4e, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x06, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x49, 0x47, - 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x07, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x41, - 0x54, 0x45, 0x44, 0x10, 0x08, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x56, 0x45, - 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x52, 0x54, 0x45, - 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x0a, 0x12, 0x14, - 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, - 0x45, 0x44, 0x10, 0x0b, 0x12, 0x18, 0x0a, 0x14, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, - 0x53, 0x5f, 0x44, 0x4f, 0x57, 0x4e, 0x4c, 0x4f, 0x41, 0x44, 0x45, 0x44, 0x10, 0x0c, 0x32, 0xe2, - 0x01, 0x0a, 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, - 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x2a, 0xb6, 0x02, 0x0a, 0x12, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, + 0x57, 0x4e, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x52, + 0x45, 0x54, 0x52, 0x49, 0x45, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x43, + 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x45, 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x55, 0x50, 0x45, 0x52, + 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x50, 0x41, 0x53, 0x53, 0x45, + 0x44, 0x10, 0x03, 0x12, 0x14, 0x0a, 0x10, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x5f, + 0x44, 0x45, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x41, 0x54, + 0x41, 0x5f, 0x48, 0x41, 0x53, 0x48, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, + 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49, 0x4e, 0x50, 0x55, 0x54, 0x5f, 0x45, 0x4e, 0x43, 0x4f, 0x44, + 0x45, 0x44, 0x10, 0x06, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, + 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x07, 0x12, 0x12, 0x0a, 0x0e, + 0x52, 0x51, 0x49, 0x44, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x41, 0x54, 0x45, 0x44, 0x10, 0x08, + 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, 0x53, + 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x0a, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, + 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x0b, 0x12, + 0x18, 0x0a, 0x14, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x44, 0x4f, 0x57, + 0x4e, 0x4c, 0x4f, 0x41, 0x44, 0x45, 0x44, 0x10, 0x0c, 0x32, 0x98, 0x01, 0x0a, 0x0e, 0x43, 0x61, + 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x08, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, + 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, + 0x01, 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, 0x2e, + 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, - 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x48, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1b, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, - 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, - 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x63, - 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, - 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x30, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, - 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, - 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, - 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x30, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, + 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x2f, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -857,41 +609,33 @@ func file_supernode_action_cascade_service_proto_rawDescGZIP() []byte { } var file_supernode_action_cascade_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_supernode_action_cascade_service_proto_goTypes = []any{ - (SupernodeEventType)(0), // 0: cascade.SupernodeEventType - (*RegisterRequest)(nil), // 1: cascade.RegisterRequest - (*DataChunk)(nil), // 2: cascade.DataChunk - (*Metadata)(nil), // 3: cascade.Metadata - (*RegisterResponse)(nil), // 4: cascade.RegisterResponse - (*DownloadRequest)(nil), // 5: cascade.DownloadRequest - (*DownloadResponse)(nil), // 6: cascade.DownloadResponse - (*DownloadEvent)(nil), // 7: cascade.DownloadEvent - (*HealthCheckRequest)(nil), // 8: cascade.HealthCheckRequest - (*HealthCheckResponse)(nil), // 9: cascade.HealthCheckResponse - (*HealthCheckResponse_CPU)(nil), // 10: cascade.HealthCheckResponse.CPU - (*HealthCheckResponse_Memory)(nil), // 11: cascade.HealthCheckResponse.Memory + (SupernodeEventType)(0), // 0: cascade.SupernodeEventType + (*RegisterRequest)(nil), // 1: cascade.RegisterRequest + (*DataChunk)(nil), // 2: cascade.DataChunk + (*Metadata)(nil), // 3: cascade.Metadata + (*RegisterResponse)(nil), // 4: cascade.RegisterResponse + (*DownloadRequest)(nil), // 5: cascade.DownloadRequest + (*DownloadResponse)(nil), // 6: cascade.DownloadResponse + (*DownloadEvent)(nil), // 7: cascade.DownloadEvent } var file_supernode_action_cascade_service_proto_depIdxs = []int32{ - 2, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk - 3, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata - 0, // 2: cascade.RegisterResponse.event_type:type_name -> cascade.SupernodeEventType - 7, // 3: cascade.DownloadResponse.event:type_name -> cascade.DownloadEvent - 2, // 4: cascade.DownloadResponse.chunk:type_name -> cascade.DataChunk - 0, // 5: cascade.DownloadEvent.event_type:type_name -> cascade.SupernodeEventType - 10, // 6: cascade.HealthCheckResponse.cpu:type_name -> cascade.HealthCheckResponse.CPU - 11, // 7: cascade.HealthCheckResponse.memory:type_name -> cascade.HealthCheckResponse.Memory - 1, // 8: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest - 8, // 9: cascade.CascadeService.HealthCheck:input_type -> cascade.HealthCheckRequest - 5, // 10: cascade.CascadeService.Download:input_type -> cascade.DownloadRequest - 4, // 11: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse - 9, // 12: cascade.CascadeService.HealthCheck:output_type -> cascade.HealthCheckResponse - 6, // 13: cascade.CascadeService.Download:output_type -> cascade.DownloadResponse - 11, // [11:14] is the sub-list for method output_type - 8, // [8:11] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 2, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk + 3, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata + 0, // 2: cascade.RegisterResponse.event_type:type_name -> cascade.SupernodeEventType + 7, // 3: cascade.DownloadResponse.event:type_name -> cascade.DownloadEvent + 2, // 4: cascade.DownloadResponse.chunk:type_name -> cascade.DataChunk + 0, // 5: cascade.DownloadEvent.event_type:type_name -> cascade.SupernodeEventType + 1, // 6: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest + 5, // 7: cascade.CascadeService.Download:input_type -> cascade.DownloadRequest + 4, // 8: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse + 6, // 9: cascade.CascadeService.Download:output_type -> cascade.DownloadResponse + 8, // [8:10] is the sub-list for method output_type + 6, // [6:8] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_supernode_action_cascade_service_proto_init() } @@ -913,7 +657,7 @@ func file_supernode_action_cascade_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_supernode_action_cascade_service_proto_rawDesc, NumEnums: 1, - NumMessages: 11, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/gen/supernode/action/cascade/service_grpc.pb.go b/gen/supernode/action/cascade/service_grpc.pb.go index 576cfea3..fc196a20 100644 --- a/gen/supernode/action/cascade/service_grpc.pb.go +++ b/gen/supernode/action/cascade/service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v3.12.4 +// - protoc v3.21.12 // source: supernode/action/cascade/service.proto package cascade @@ -19,9 +19,8 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - CascadeService_Register_FullMethodName = "/cascade.CascadeService/Register" - CascadeService_HealthCheck_FullMethodName = "/cascade.CascadeService/HealthCheck" - CascadeService_Download_FullMethodName = "/cascade.CascadeService/Download" + CascadeService_Register_FullMethodName = "/cascade.CascadeService/Register" + CascadeService_Download_FullMethodName = "/cascade.CascadeService/Download" ) // CascadeServiceClient is the client API for CascadeService service. @@ -29,7 +28,6 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CascadeServiceClient interface { Register(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[RegisterRequest, RegisterResponse], error) - HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadResponse], error) } @@ -54,16 +52,6 @@ func (c *cascadeServiceClient) Register(ctx context.Context, opts ...grpc.CallOp // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type CascadeService_RegisterClient = grpc.BidiStreamingClient[RegisterRequest, RegisterResponse] -func (c *cascadeServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(HealthCheckResponse) - err := c.cc.Invoke(ctx, CascadeService_HealthCheck_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *cascadeServiceClient) Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &CascadeService_ServiceDesc.Streams[1], CascadeService_Download_FullMethodName, cOpts...) @@ -88,7 +76,6 @@ type CascadeService_DownloadClient = grpc.ServerStreamingClient[DownloadResponse // for forward compatibility. type CascadeServiceServer interface { Register(grpc.BidiStreamingServer[RegisterRequest, RegisterResponse]) error - HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) Download(*DownloadRequest, grpc.ServerStreamingServer[DownloadResponse]) error mustEmbedUnimplementedCascadeServiceServer() } @@ -103,9 +90,6 @@ type UnimplementedCascadeServiceServer struct{} func (UnimplementedCascadeServiceServer) Register(grpc.BidiStreamingServer[RegisterRequest, RegisterResponse]) error { return status.Errorf(codes.Unimplemented, "method Register not implemented") } -func (UnimplementedCascadeServiceServer) HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method HealthCheck not implemented") -} func (UnimplementedCascadeServiceServer) Download(*DownloadRequest, grpc.ServerStreamingServer[DownloadResponse]) error { return status.Errorf(codes.Unimplemented, "method Download not implemented") } @@ -137,24 +121,6 @@ func _CascadeService_Register_Handler(srv interface{}, stream grpc.ServerStream) // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type CascadeService_RegisterServer = grpc.BidiStreamingServer[RegisterRequest, RegisterResponse] -func _CascadeService_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HealthCheckRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(CascadeServiceServer).HealthCheck(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: CascadeService_HealthCheck_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(CascadeServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _CascadeService_Download_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(DownloadRequest) if err := stream.RecvMsg(m); err != nil { @@ -172,12 +138,7 @@ type CascadeService_DownloadServer = grpc.ServerStreamingServer[DownloadResponse var CascadeService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "cascade.CascadeService", HandlerType: (*CascadeServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "HealthCheck", - Handler: _CascadeService_HealthCheck_Handler, - }, - }, + Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "Register", diff --git a/gen/supernode/supernode.pb.go b/gen/supernode/supernode.pb.go new file mode 100644 index 00000000..0b3610f1 --- /dev/null +++ b/gen/supernode/supernode.pb.go @@ -0,0 +1,420 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.2 +// protoc v3.21.12 +// source: supernode/supernode.proto + +package supernode + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StatusRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *StatusRequest) Reset() { + *x = StatusRequest{} + mi := &file_supernode_supernode_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusRequest) ProtoMessage() {} + +func (x *StatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_supernode_supernode_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusRequest.ProtoReflect.Descriptor instead. +func (*StatusRequest) Descriptor() ([]byte, []int) { + return file_supernode_supernode_proto_rawDescGZIP(), []int{0} +} + +// The StatusResponse represents system status. +type StatusResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Cpu *StatusResponse_CPU `protobuf:"bytes,1,opt,name=cpu,proto3" json:"cpu,omitempty"` + Memory *StatusResponse_Memory `protobuf:"bytes,2,opt,name=memory,proto3" json:"memory,omitempty"` + Services []*StatusResponse_ServiceTasks `protobuf:"bytes,3,rep,name=services,proto3" json:"services,omitempty"` + AvailableServices []string `protobuf:"bytes,4,rep,name=available_services,json=availableServices,proto3" json:"available_services,omitempty"` +} + +func (x *StatusResponse) Reset() { + *x = StatusResponse{} + mi := &file_supernode_supernode_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse) ProtoMessage() {} + +func (x *StatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_supernode_supernode_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. +func (*StatusResponse) Descriptor() ([]byte, []int) { + return file_supernode_supernode_proto_rawDescGZIP(), []int{1} +} + +func (x *StatusResponse) GetCpu() *StatusResponse_CPU { + if x != nil { + return x.Cpu + } + return nil +} + +func (x *StatusResponse) GetMemory() *StatusResponse_Memory { + if x != nil { + return x.Memory + } + return nil +} + +func (x *StatusResponse) GetServices() []*StatusResponse_ServiceTasks { + if x != nil { + return x.Services + } + return nil +} + +func (x *StatusResponse) GetAvailableServices() []string { + if x != nil { + return x.AvailableServices + } + return nil +} + +type StatusResponse_CPU struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Usage string `protobuf:"bytes,1,opt,name=usage,proto3" json:"usage,omitempty"` + Remaining string `protobuf:"bytes,2,opt,name=remaining,proto3" json:"remaining,omitempty"` +} + +func (x *StatusResponse_CPU) Reset() { + *x = StatusResponse_CPU{} + mi := &file_supernode_supernode_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse_CPU) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse_CPU) ProtoMessage() {} + +func (x *StatusResponse_CPU) ProtoReflect() protoreflect.Message { + mi := &file_supernode_supernode_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse_CPU.ProtoReflect.Descriptor instead. +func (*StatusResponse_CPU) Descriptor() ([]byte, []int) { + return file_supernode_supernode_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *StatusResponse_CPU) GetUsage() string { + if x != nil { + return x.Usage + } + return "" +} + +func (x *StatusResponse_CPU) GetRemaining() string { + if x != nil { + return x.Remaining + } + return "" +} + +type StatusResponse_Memory struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"` + Used uint64 `protobuf:"varint,2,opt,name=used,proto3" json:"used,omitempty"` + Available uint64 `protobuf:"varint,3,opt,name=available,proto3" json:"available,omitempty"` + UsedPerc float64 `protobuf:"fixed64,4,opt,name=used_perc,json=usedPerc,proto3" json:"used_perc,omitempty"` +} + +func (x *StatusResponse_Memory) Reset() { + *x = StatusResponse_Memory{} + mi := &file_supernode_supernode_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse_Memory) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse_Memory) ProtoMessage() {} + +func (x *StatusResponse_Memory) ProtoReflect() protoreflect.Message { + mi := &file_supernode_supernode_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse_Memory.ProtoReflect.Descriptor instead. +func (*StatusResponse_Memory) Descriptor() ([]byte, []int) { + return file_supernode_supernode_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *StatusResponse_Memory) GetTotal() uint64 { + if x != nil { + return x.Total + } + return 0 +} + +func (x *StatusResponse_Memory) GetUsed() uint64 { + if x != nil { + return x.Used + } + return 0 +} + +func (x *StatusResponse_Memory) GetAvailable() uint64 { + if x != nil { + return x.Available + } + return 0 +} + +func (x *StatusResponse_Memory) GetUsedPerc() float64 { + if x != nil { + return x.UsedPerc + } + return 0 +} + +// ServiceTasks contains task information for a specific service +type StatusResponse_ServiceTasks struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ServiceName string `protobuf:"bytes,1,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + TaskIds []string `protobuf:"bytes,2,rep,name=task_ids,json=taskIds,proto3" json:"task_ids,omitempty"` + TaskCount int32 `protobuf:"varint,3,opt,name=task_count,json=taskCount,proto3" json:"task_count,omitempty"` +} + +func (x *StatusResponse_ServiceTasks) Reset() { + *x = StatusResponse_ServiceTasks{} + mi := &file_supernode_supernode_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse_ServiceTasks) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse_ServiceTasks) ProtoMessage() {} + +func (x *StatusResponse_ServiceTasks) ProtoReflect() protoreflect.Message { + mi := &file_supernode_supernode_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse_ServiceTasks.ProtoReflect.Descriptor instead. +func (*StatusResponse_ServiceTasks) Descriptor() ([]byte, []int) { + return file_supernode_supernode_proto_rawDescGZIP(), []int{1, 2} +} + +func (x *StatusResponse_ServiceTasks) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *StatusResponse_ServiceTasks) GetTaskIds() []string { + if x != nil { + return x.TaskIds + } + return nil +} + +func (x *StatusResponse_ServiceTasks) GetTaskCount() int32 { + if x != nil { + return x.TaskCount + } + return 0 +} + +var File_supernode_supernode_proto protoreflect.FileDescriptor + +var file_supernode_supernode_proto_rawDesc = []byte{ + 0x0a, 0x19, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x73, 0x75, 0x70, 0x65, + 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x73, 0x75, 0x70, + 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x22, 0x0f, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x85, 0x04, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x03, 0x63, 0x70, + 0x75, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x43, 0x50, 0x55, 0x52, 0x03, 0x63, 0x70, 0x75, 0x12, 0x38, 0x0a, 0x06, 0x6d, + 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x73, 0x75, + 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x52, 0x06, 0x6d, + 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, 0x42, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x73, 0x52, + 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x61, 0x76, 0x61, + 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x11, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x1a, 0x39, 0x0a, 0x03, 0x43, 0x50, 0x55, 0x12, + 0x14, 0x0a, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x75, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, + 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, + 0x69, 0x6e, 0x67, 0x1a, 0x6d, 0x0a, 0x06, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, + 0x74, 0x61, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x04, 0x75, 0x73, 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, + 0x61, 0x62, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x61, 0x76, 0x61, 0x69, + 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65, 0x64, 0x5f, 0x70, 0x65, + 0x72, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x08, 0x75, 0x73, 0x65, 0x64, 0x50, 0x65, + 0x72, 0x63, 0x1a, 0x6b, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x54, 0x61, 0x73, + 0x6b, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x73, + 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, + 0x54, 0x0a, 0x10, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x40, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x12, 0x18, 0x2e, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x73, 0x75, 0x70, + 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, + 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_supernode_supernode_proto_rawDescOnce sync.Once + file_supernode_supernode_proto_rawDescData = file_supernode_supernode_proto_rawDesc +) + +func file_supernode_supernode_proto_rawDescGZIP() []byte { + file_supernode_supernode_proto_rawDescOnce.Do(func() { + file_supernode_supernode_proto_rawDescData = protoimpl.X.CompressGZIP(file_supernode_supernode_proto_rawDescData) + }) + return file_supernode_supernode_proto_rawDescData +} + +var file_supernode_supernode_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_supernode_supernode_proto_goTypes = []any{ + (*StatusRequest)(nil), // 0: supernode.StatusRequest + (*StatusResponse)(nil), // 1: supernode.StatusResponse + (*StatusResponse_CPU)(nil), // 2: supernode.StatusResponse.CPU + (*StatusResponse_Memory)(nil), // 3: supernode.StatusResponse.Memory + (*StatusResponse_ServiceTasks)(nil), // 4: supernode.StatusResponse.ServiceTasks +} +var file_supernode_supernode_proto_depIdxs = []int32{ + 2, // 0: supernode.StatusResponse.cpu:type_name -> supernode.StatusResponse.CPU + 3, // 1: supernode.StatusResponse.memory:type_name -> supernode.StatusResponse.Memory + 4, // 2: supernode.StatusResponse.services:type_name -> supernode.StatusResponse.ServiceTasks + 0, // 3: supernode.SupernodeService.GetStatus:input_type -> supernode.StatusRequest + 1, // 4: supernode.SupernodeService.GetStatus:output_type -> supernode.StatusResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_supernode_supernode_proto_init() } +func file_supernode_supernode_proto_init() { + if File_supernode_supernode_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_supernode_supernode_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_supernode_supernode_proto_goTypes, + DependencyIndexes: file_supernode_supernode_proto_depIdxs, + MessageInfos: file_supernode_supernode_proto_msgTypes, + }.Build() + File_supernode_supernode_proto = out.File + file_supernode_supernode_proto_rawDesc = nil + file_supernode_supernode_proto_goTypes = nil + file_supernode_supernode_proto_depIdxs = nil +} diff --git a/gen/supernode/supernode_grpc.pb.go b/gen/supernode/supernode_grpc.pb.go new file mode 100644 index 00000000..783c3f8c --- /dev/null +++ b/gen/supernode/supernode_grpc.pb.go @@ -0,0 +1,125 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v3.21.12 +// source: supernode/supernode.proto + +package supernode + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + SupernodeService_GetStatus_FullMethodName = "/supernode.SupernodeService/GetStatus" +) + +// SupernodeServiceClient is the client API for SupernodeService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// SupernodeService provides status information for all services +type SupernodeServiceClient interface { + GetStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) +} + +type supernodeServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewSupernodeServiceClient(cc grpc.ClientConnInterface) SupernodeServiceClient { + return &supernodeServiceClient{cc} +} + +func (c *supernodeServiceClient) GetStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StatusResponse) + err := c.cc.Invoke(ctx, SupernodeService_GetStatus_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SupernodeServiceServer is the server API for SupernodeService service. +// All implementations must embed UnimplementedSupernodeServiceServer +// for forward compatibility. +// +// SupernodeService provides status information for all services +type SupernodeServiceServer interface { + GetStatus(context.Context, *StatusRequest) (*StatusResponse, error) + mustEmbedUnimplementedSupernodeServiceServer() +} + +// UnimplementedSupernodeServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSupernodeServiceServer struct{} + +func (UnimplementedSupernodeServiceServer) GetStatus(context.Context, *StatusRequest) (*StatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetStatus not implemented") +} +func (UnimplementedSupernodeServiceServer) mustEmbedUnimplementedSupernodeServiceServer() {} +func (UnimplementedSupernodeServiceServer) testEmbeddedByValue() {} + +// UnsafeSupernodeServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SupernodeServiceServer will +// result in compilation errors. +type UnsafeSupernodeServiceServer interface { + mustEmbedUnimplementedSupernodeServiceServer() +} + +func RegisterSupernodeServiceServer(s grpc.ServiceRegistrar, srv SupernodeServiceServer) { + // If the following call pancis, it indicates UnimplementedSupernodeServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&SupernodeService_ServiceDesc, srv) +} + +func _SupernodeService_GetStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SupernodeServiceServer).GetStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SupernodeService_GetStatus_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SupernodeServiceServer).GetStatus(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SupernodeService_ServiceDesc is the grpc.ServiceDesc for SupernodeService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SupernodeService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "supernode.SupernodeService", + HandlerType: (*SupernodeServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetStatus", + Handler: _SupernodeService_GetStatus_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "supernode/supernode.proto", +} diff --git a/proto/supernode/action/cascade/service.proto b/proto/supernode/action/cascade/service.proto index 23be1d35..e8cb9a97 100644 --- a/proto/supernode/action/cascade/service.proto +++ b/proto/supernode/action/cascade/service.proto @@ -4,7 +4,6 @@ option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/ca service CascadeService { rpc Register (stream RegisterRequest) returns (stream RegisterResponse); - rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); rpc Download (DownloadRequest) returns (stream DownloadResponse); } @@ -62,23 +61,3 @@ enum SupernodeEventType { ARTEFACTS_DOWNLOADED = 12; } -message HealthCheckRequest {} - -// The HealthCheckResponse represents system health status. -message HealthCheckResponse { - message CPU { - string usage = 1; - string remaining = 2; - } - - message Memory { - uint64 total = 1; - uint64 used = 2; - uint64 available = 3; - double used_perc = 4; - } - - CPU cpu = 1; - Memory memory = 2; - repeated string tasks_in_progress = 3; -} \ No newline at end of file diff --git a/proto/supernode/supernode.proto b/proto/supernode/supernode.proto new file mode 100644 index 00000000..10d72990 --- /dev/null +++ b/proto/supernode/supernode.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +package supernode; +option go_package = "github.com/LumeraProtocol/supernode/gen/supernode"; + +// SupernodeService provides status information for all services +service SupernodeService { + rpc GetStatus(StatusRequest) returns (StatusResponse); +} + +message StatusRequest {} + +// The StatusResponse represents system status. +message StatusResponse { + message CPU { + string usage = 1; + string remaining = 2; + } + + message Memory { + uint64 total = 1; + uint64 used = 2; + uint64 available = 3; + double used_perc = 4; + } + + // ServiceTasks contains task information for a specific service + message ServiceTasks { + string service_name = 1; + repeated string task_ids = 2; + int32 task_count = 3; + } + + CPU cpu = 1; + Memory memory = 2; + repeated ServiceTasks services = 3; + repeated string available_services = 4; +} \ No newline at end of file diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index f88e2129..fd0cfb8d 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" + "github.com/LumeraProtocol/supernode/gen/supernode" "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/pkg/net" "github.com/LumeraProtocol/supernode/sdk/event" @@ -16,11 +17,12 @@ import ( ) type cascadeAdapter struct { - client cascade.CascadeServiceClient - logger log.Logger + client cascade.CascadeServiceClient + statusClient supernode.SupernodeServiceClient + logger log.Logger } -func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient, logger log.Logger) CascadeServiceClient { +func NewCascadeAdapter(ctx context.Context, conn *grpc.ClientConn, logger log.Logger) CascadeServiceClient { if logger == nil { logger = log.NewNoopLogger() } @@ -28,8 +30,9 @@ func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient, logger.Debug(ctx, "Creating cascade service adapter") return &cascadeAdapter{ - client: client, - logger: logger, + client: cascade.NewCascadeServiceClient(conn), + statusClient: supernode.NewSupernodeServiceClient(conn), + logger: logger, } } @@ -166,7 +169,7 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca } func (a *cascadeAdapter) GetSupernodeStatus(ctx context.Context) (SupernodeStatusresponse, error) { - resp, err := a.client.HealthCheck(ctx, &cascade.HealthCheckRequest{}) + resp, err := a.statusClient.GetStatus(ctx, &supernode.StatusRequest{}) if err != nil { a.logger.Error(ctx, "Failed to get supernode status", "error", err) return SupernodeStatusresponse{}, fmt.Errorf("failed to get supernode status: %w", err) @@ -296,10 +299,8 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType { } } -func toSdkSupernodeStatus(resp *cascade.HealthCheckResponse) *SupernodeStatusresponse { - result := &SupernodeStatusresponse{ - TasksInProgress: resp.TasksInProgress, - } +func toSdkSupernodeStatus(resp *supernode.StatusResponse) *SupernodeStatusresponse { + result := &SupernodeStatusresponse{} // Convert CPU data if resp.Cpu != nil { @@ -315,5 +316,11 @@ func toSdkSupernodeStatus(resp *cascade.HealthCheckResponse) *SupernodeStatusres result.Memory.UsedPerc = resp.Memory.UsedPerc } + // Aggregate all task IDs from all services + result.TasksInProgress = make([]string, 0) + for _, service := range resp.Services { + result.TasksInProgress = append(result.TasksInProgress, service.TaskIds...) + } + return result } diff --git a/sdk/net/impl.go b/sdk/net/impl.go index 68d6e226..98b68338 100644 --- a/sdk/net/impl.go +++ b/sdk/net/impl.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" - "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" "github.com/LumeraProtocol/supernode/pkg/net/grpc/client" @@ -93,7 +92,7 @@ func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring. // Create service clients cascadeClient := supernodeservice.NewCascadeAdapter( ctx, - cascade.NewCascadeServiceClient(conn), + conn, logger, ) diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index b7e7bbd6..8b3a206c 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -103,6 +103,11 @@ The supernode will connect to the Lumera network and begin participating in the // Create cascade action server cascadeActionServer := cascade.NewCascadeActionServer(cService) + // Create supernode status service + statusService := common.NewSupernodeStatusService() + statusService.RegisterTaskProvider(cService) + supernodeServer := server.NewSupernodeServer(statusService) + // Configure server serverConfig := &server.Config{ @@ -117,6 +122,7 @@ The supernode will connect to the Lumera network and begin participating in the kr, lumeraClient, cascadeActionServer, + supernodeServer, ) if err != nil { return fmt.Errorf("failed to create gRPC server: %w", err) diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 238b53d6..5865279e 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -1,7 +1,6 @@ package cascade import ( - "context" "encoding/hex" "fmt" "io" @@ -162,28 +161,6 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er return nil } -func (server *ActionServer) HealthCheck(ctx context.Context, _ *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) { - resp, err := server.factory.NewCascadeRegistrationTask().HealthCheck(ctx) - if err != nil { - logtrace.Error(ctx, "error retrieving health-check metrics for supernode", logtrace.Fields{}) - return nil, err - } - - return &pb.HealthCheckResponse{ - Cpu: &pb.HealthCheckResponse_CPU{ - Usage: resp.CPU.Usage, - Remaining: resp.CPU.Remaining, - }, - Memory: &pb.HealthCheckResponse_Memory{ - Total: resp.Memory.Total, - Used: resp.Memory.Used, - Available: resp.Memory.Available, - UsedPerc: resp.Memory.UsedPerc, - }, - TasksInProgress: resp.TasksInProgress, - }, nil -} - func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeService_DownloadServer) error { fields := logtrace.Fields{ logtrace.FieldMethod: "Download", diff --git a/supernode/node/supernode/server/status_server.go b/supernode/node/supernode/server/status_server.go new file mode 100644 index 00000000..c68a4728 --- /dev/null +++ b/supernode/node/supernode/server/status_server.go @@ -0,0 +1,65 @@ +package server + +import ( + "context" + + "google.golang.org/grpc" + + pb "github.com/LumeraProtocol/supernode/gen/supernode" + "github.com/LumeraProtocol/supernode/supernode/services/common" +) + +// SupernodeServer implements the SupernodeService gRPC service +type SupernodeServer struct { + pb.UnimplementedSupernodeServiceServer + statusService *common.SupernodeStatusService +} + +// NewSupernodeServer creates a new SupernodeServer +func NewSupernodeServer(statusService *common.SupernodeStatusService) *SupernodeServer { + return &SupernodeServer{ + statusService: statusService, + } +} + +// GetStatus implements SupernodeService.GetStatus +func (s *SupernodeServer) GetStatus(ctx context.Context, req *pb.StatusRequest) (*pb.StatusResponse, error) { + // Get status from the common service + status, err := s.statusService.GetStatus(ctx) + if err != nil { + return nil, err + } + + // Convert to protobuf response + response := &pb.StatusResponse{ + Cpu: &pb.StatusResponse_CPU{ + Usage: status.CPU.Usage, + Remaining: status.CPU.Remaining, + }, + Memory: &pb.StatusResponse_Memory{ + Total: status.Memory.Total, + Used: status.Memory.Used, + Available: status.Memory.Available, + UsedPerc: status.Memory.UsedPerc, + }, + Services: make([]*pb.StatusResponse_ServiceTasks, 0, len(status.Services)), + AvailableServices: status.AvailableServices, + } + + // Convert service tasks + for _, service := range status.Services { + serviceTask := &pb.StatusResponse_ServiceTasks{ + ServiceName: service.ServiceName, + TaskIds: service.TaskIDs, + TaskCount: service.TaskCount, + } + response.Services = append(response.Services, serviceTask) + } + + return response, nil +} + +// Desc implements the service interface for gRPC service registration +func (s *SupernodeServer) Desc() *grpc.ServiceDesc { + return &pb.SupernodeService_ServiceDesc +} diff --git a/supernode/node/supernode/server/status_server_test.go b/supernode/node/supernode/server/status_server_test.go new file mode 100644 index 00000000..133f38ff --- /dev/null +++ b/supernode/node/supernode/server/status_server_test.go @@ -0,0 +1,80 @@ +package server + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pb "github.com/LumeraProtocol/supernode/gen/supernode" + "github.com/LumeraProtocol/supernode/supernode/services/common" +) + +func TestSupernodeServer_GetStatus(t *testing.T) { + ctx := context.Background() + + // Create status service + statusService := common.NewSupernodeStatusService() + + // Create server + server := NewSupernodeServer(statusService) + + // Test with empty service + resp, err := server.GetStatus(ctx, &pb.StatusRequest{}) + require.NoError(t, err) + assert.NotNil(t, resp) + + // Check basic structure + assert.NotNil(t, resp.Cpu) + assert.NotNil(t, resp.Memory) + assert.NotEmpty(t, resp.Cpu.Usage) + assert.NotEmpty(t, resp.Cpu.Remaining) + assert.True(t, resp.Memory.Total > 0) + + // Should have no services initially + assert.Empty(t, resp.Services) + assert.Empty(t, resp.AvailableServices) +} + +func TestSupernodeServer_GetStatusWithService(t *testing.T) { + ctx := context.Background() + + // Create status service + statusService := common.NewSupernodeStatusService() + + // Add a mock task provider + mockProvider := &common.MockTaskProvider{ + ServiceName: "test-service", + TaskIDs: []string{"task1", "task2"}, + } + statusService.RegisterTaskProvider(mockProvider) + + // Create server + server := NewSupernodeServer(statusService) + + // Test with service + resp, err := server.GetStatus(ctx, &pb.StatusRequest{}) + require.NoError(t, err) + assert.NotNil(t, resp) + + // Should have one service + assert.Len(t, resp.Services, 1) + assert.Len(t, resp.AvailableServices, 1) + assert.Equal(t, []string{"test-service"}, resp.AvailableServices) + + // Check service details + service := resp.Services[0] + assert.Equal(t, "test-service", service.ServiceName) + assert.Equal(t, int32(2), service.TaskCount) + assert.Equal(t, []string{"task1", "task2"}, service.TaskIds) +} + +func TestSupernodeServer_Desc(t *testing.T) { + statusService := common.NewSupernodeStatusService() + server := NewSupernodeServer(statusService) + + desc := server.Desc() + assert.NotNil(t, desc) + assert.Equal(t, "supernode.SupernodeService", desc.ServiceName) +} diff --git a/supernode/services/cascade/healthcheck.go b/supernode/services/cascade/healthcheck.go index 422494fe..3c0d8859 100644 --- a/supernode/services/cascade/healthcheck.go +++ b/supernode/services/cascade/healthcheck.go @@ -2,67 +2,19 @@ package cascade import ( "context" - "fmt" - "time" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/mem" + "github.com/LumeraProtocol/supernode/supernode/services/common" ) -type HealthCheckResponse struct { - CPU struct { - Usage string - Remaining string - } - Memory struct { - Total uint64 - Used uint64 - Available uint64 - UsedPerc float64 - } - TasksInProgress []string -} +// HealthCheckResponse represents the health check response for cascade service +type HealthCheckResponse = common.StatusResponse +// HealthCheck delegates to the common supernode status service func (task *CascadeRegistrationTask) HealthCheck(ctx context.Context) (HealthCheckResponse, error) { - fields := logtrace.Fields{ - logtrace.FieldMethod: "HealthCheck", - logtrace.FieldModule: "CascadeActionServer", - } - logtrace.Info(ctx, "healthcheck request received", fields) - - var resp HealthCheckResponse - - percentages, err := cpu.Percent(time.Second, false) - if err != nil { - logtrace.Error(ctx, "failed to get cpu info", logtrace.Fields{logtrace.FieldError: err.Error()}) - return resp, err - } - fmt.Println(percentages) - usage := percentages[0] - remaining := 100 - usage - - // Set CPU values in response - resp.CPU.Usage = fmt.Sprintf("%.2f", usage) - resp.CPU.Remaining = fmt.Sprintf("%.2f", remaining) - - // Memory stats - vmem, err := mem.VirtualMemory() - if err != nil { - logtrace.Error(ctx, "failed to get memory info", logtrace.Fields{logtrace.FieldError: err.Error()}) - return resp, err - } - resp.Memory.Total = vmem.Total - resp.Memory.Used = vmem.Used - resp.Memory.Available = vmem.Available - resp.Memory.UsedPerc = vmem.UsedPercent - - // Tasks - for _, t := range task.Worker.Tasks() { - resp.TasksInProgress = append(resp.TasksInProgress, t.ID()) - } - - logtrace.Info(ctx, "top-style healthcheck data", logtrace.Fields{"cpu_usage": fmt.Sprintf("%.2f", usage), "cpu_remaining": fmt.Sprintf("%.2f", remaining), "mem_total": resp.Memory.Total, "mem_used": resp.Memory.Used, "mem_used%": resp.Memory.UsedPerc, "task_count": len(resp.TasksInProgress)}) + // Create a status service and register the cascade service as a task provider + statusService := common.NewSupernodeStatusService() + statusService.RegisterTaskProvider(task.CascadeService) - return resp, nil + // Get the status from the common service + return statusService.GetStatus(ctx) } diff --git a/supernode/services/cascade/healthcheck_test.go b/supernode/services/cascade/healthcheck_test.go index 973136d5..e4cdaed1 100644 --- a/supernode/services/cascade/healthcheck_test.go +++ b/supernode/services/cascade/healthcheck_test.go @@ -81,8 +81,28 @@ func TestHealthCheck(t *testing.T) { assert.True(t, resp.Memory.Used <= resp.Memory.Total) assert.True(t, resp.Memory.UsedPerc >= 0 && resp.Memory.UsedPerc <= 100) - // Task count check - assert.Equal(t, tt.expectTasks, len(resp.TasksInProgress)) + // Available services check + assert.Contains(t, resp.AvailableServices, "cascade") + + // Task count check - look for cascade service in the services list + var cascadeService *common.ServiceTasks + for _, service := range resp.Services { + if service.ServiceName == "cascade" { + cascadeService = &service + break + } + } + + if tt.expectTasks > 0 { + assert.NotNil(t, cascadeService, "cascade service should be present") + assert.Equal(t, tt.expectTasks, int(cascadeService.TaskCount)) + assert.Equal(t, tt.expectTasks, len(cascadeService.TaskIDs)) + } else { + // If no tasks expected, either no cascade service or empty task count + if cascadeService != nil { + assert.Equal(t, 0, int(cascadeService.TaskCount)) + } + } }) } } diff --git a/supernode/services/cascade/service.go b/supernode/services/cascade/service.go index ae440dfd..f0cb1093 100644 --- a/supernode/services/cascade/service.go +++ b/supernode/services/cascade/service.go @@ -32,6 +32,20 @@ func (service *CascadeService) Run(ctx context.Context) error { return service.RunHelper(ctx, service.config.SupernodeAccountAddress, logPrefix) } +// GetServiceName returns the name of the cascade service +func (service *CascadeService) GetServiceName() string { + return "cascade" +} + +// GetRunningTasks returns a list of currently running task IDs +func (service *CascadeService) GetRunningTasks() []string { + var taskIDs []string + for _, task := range service.Worker.Tasks() { + taskIDs = append(taskIDs, task.ID()) + } + return taskIDs +} + // NewCascadeService returns a new CascadeService instance func NewCascadeService(config *Config, lumera lumera.Client, p2pClient p2p.Client, codec codec.Codec, rqstore rqstore.Store) *CascadeService { return &CascadeService{ diff --git a/supernode/services/common/supernode_status.go b/supernode/services/common/supernode_status.go new file mode 100644 index 00000000..631ebd58 --- /dev/null +++ b/supernode/services/common/supernode_status.go @@ -0,0 +1,125 @@ +package common + +import ( + "context" + "fmt" + "time" + + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/mem" +) + +// StatusResponse represents system status +type StatusResponse struct { + CPU struct { + Usage string + Remaining string + } + Memory struct { + Total uint64 + Used uint64 + Available uint64 + UsedPerc float64 + } + Services []ServiceTasks + AvailableServices []string +} + +// ServiceTasks contains task information for a specific service +type ServiceTasks struct { + ServiceName string + TaskIDs []string + TaskCount int32 +} + +// TaskProvider interface for services to provide their running tasks +type TaskProvider interface { + GetServiceName() string + GetRunningTasks() []string +} + +// SupernodeStatusService provides centralized status information +type SupernodeStatusService struct { + taskProviders []TaskProvider +} + +// NewSupernodeStatusService creates a new supernode status service +func NewSupernodeStatusService() *SupernodeStatusService { + return &SupernodeStatusService{ + taskProviders: make([]TaskProvider, 0), + } +} + +// RegisterTaskProvider registers a service as a task provider +func (s *SupernodeStatusService) RegisterTaskProvider(provider TaskProvider) { + s.taskProviders = append(s.taskProviders, provider) +} + +// GetStatus returns the current system status including all registered services +func (s *SupernodeStatusService) GetStatus(ctx context.Context) (StatusResponse, error) { + fields := logtrace.Fields{ + logtrace.FieldMethod: "GetStatus", + logtrace.FieldModule: "SupernodeStatusService", + } + logtrace.Info(ctx, "status request received", fields) + + var resp StatusResponse + + // Get CPU information + percentages, err := cpu.Percent(time.Second, false) + if err != nil { + logtrace.Error(ctx, "failed to get cpu info", logtrace.Fields{logtrace.FieldError: err.Error()}) + return resp, err + } + + usage := percentages[0] + remaining := 100 - usage + resp.CPU.Usage = fmt.Sprintf("%.2f", usage) + resp.CPU.Remaining = fmt.Sprintf("%.2f", remaining) + + // Get Memory information + vmem, err := mem.VirtualMemory() + if err != nil { + logtrace.Error(ctx, "failed to get memory info", logtrace.Fields{logtrace.FieldError: err.Error()}) + return resp, err + } + resp.Memory.Total = vmem.Total + resp.Memory.Used = vmem.Used + resp.Memory.Available = vmem.Available + resp.Memory.UsedPerc = vmem.UsedPercent + + // Get service information from all registered providers + resp.Services = make([]ServiceTasks, 0, len(s.taskProviders)) + resp.AvailableServices = make([]string, 0, len(s.taskProviders)) + + for _, provider := range s.taskProviders { + serviceName := provider.GetServiceName() + tasks := provider.GetRunningTasks() + + serviceTask := ServiceTasks{ + ServiceName: serviceName, + TaskIDs: tasks, + TaskCount: int32(len(tasks)), + } + resp.Services = append(resp.Services, serviceTask) + resp.AvailableServices = append(resp.AvailableServices, serviceName) + } + + totalTasks := 0 + for _, service := range resp.Services { + totalTasks += int(service.TaskCount) + } + + logtrace.Info(ctx, "status data collected", logtrace.Fields{ + "cpu_usage": fmt.Sprintf("%.2f", usage), + "cpu_remaining": fmt.Sprintf("%.2f", remaining), + "mem_total": resp.Memory.Total, + "mem_used": resp.Memory.Used, + "mem_used%": resp.Memory.UsedPerc, + "service_count": len(resp.Services), + "total_tasks": totalTasks, + }) + + return resp, nil +} diff --git a/supernode/services/common/supernode_status_test.go b/supernode/services/common/supernode_status_test.go new file mode 100644 index 00000000..6391e104 --- /dev/null +++ b/supernode/services/common/supernode_status_test.go @@ -0,0 +1,118 @@ +package common + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSupernodeStatusService(t *testing.T) { + ctx := context.Background() + + t.Run("empty service", func(t *testing.T) { + statusService := NewSupernodeStatusService() + + resp, err := statusService.GetStatus(ctx) + assert.NoError(t, err) + + // Should have CPU and Memory info + assert.NotEmpty(t, resp.CPU.Usage) + assert.NotEmpty(t, resp.CPU.Remaining) + assert.True(t, resp.Memory.Total > 0) + + // Should have empty services list + assert.Empty(t, resp.Services) + assert.Empty(t, resp.AvailableServices) + }) + + t.Run("single service with tasks", func(t *testing.T) { + statusService := NewSupernodeStatusService() + + // Register a mock task provider + mockProvider := &MockTaskProvider{ + ServiceName: "test-service", + TaskIDs: []string{"task1", "task2", "task3"}, + } + statusService.RegisterTaskProvider(mockProvider) + + resp, err := statusService.GetStatus(ctx) + assert.NoError(t, err) + + // Should have one service + assert.Len(t, resp.Services, 1) + assert.Len(t, resp.AvailableServices, 1) + assert.Equal(t, []string{"test-service"}, resp.AvailableServices) + + service := resp.Services[0] + assert.Equal(t, "test-service", service.ServiceName) + assert.Equal(t, int32(3), service.TaskCount) + assert.Equal(t, []string{"task1", "task2", "task3"}, service.TaskIDs) + }) + + t.Run("multiple services", func(t *testing.T) { + statusService := NewSupernodeStatusService() + + // Register multiple mock task providers + cascadeProvider := &MockTaskProvider{ + ServiceName: "cascade", + TaskIDs: []string{"cascade1", "cascade2"}, + } + senseProvider := &MockTaskProvider{ + ServiceName: "sense", + TaskIDs: []string{"sense1"}, + } + + statusService.RegisterTaskProvider(cascadeProvider) + statusService.RegisterTaskProvider(senseProvider) + + resp, err := statusService.GetStatus(ctx) + assert.NoError(t, err) + + // Should have two services + assert.Len(t, resp.Services, 2) + assert.Len(t, resp.AvailableServices, 2) + assert.Contains(t, resp.AvailableServices, "cascade") + assert.Contains(t, resp.AvailableServices, "sense") + + // Check services are present + serviceMap := make(map[string]ServiceTasks) + for _, service := range resp.Services { + serviceMap[service.ServiceName] = service + } + + cascade, ok := serviceMap["cascade"] + assert.True(t, ok) + assert.Equal(t, int32(2), cascade.TaskCount) + assert.Equal(t, []string{"cascade1", "cascade2"}, cascade.TaskIDs) + + sense, ok := serviceMap["sense"] + assert.True(t, ok) + assert.Equal(t, int32(1), sense.TaskCount) + assert.Equal(t, []string{"sense1"}, sense.TaskIDs) + }) + + t.Run("service with no tasks", func(t *testing.T) { + statusService := NewSupernodeStatusService() + + // Register a mock task provider with no tasks + mockProvider := &MockTaskProvider{ + ServiceName: "empty-service", + TaskIDs: []string{}, + } + statusService.RegisterTaskProvider(mockProvider) + + resp, err := statusService.GetStatus(ctx) + assert.NoError(t, err) + + // Should have one service + assert.Len(t, resp.Services, 1) + assert.Len(t, resp.AvailableServices, 1) + assert.Equal(t, []string{"empty-service"}, resp.AvailableServices) + + service := resp.Services[0] + assert.Equal(t, "empty-service", service.ServiceName) + assert.Equal(t, int32(0), service.TaskCount) + assert.Empty(t, service.TaskIDs) + }) +} diff --git a/supernode/services/common/test_helpers.go b/supernode/services/common/test_helpers.go new file mode 100644 index 00000000..c49b940a --- /dev/null +++ b/supernode/services/common/test_helpers.go @@ -0,0 +1,15 @@ +package common + +// MockTaskProvider for testing (exported for use in other packages) +type MockTaskProvider struct { + ServiceName string + TaskIDs []string +} + +func (m *MockTaskProvider) GetServiceName() string { + return m.ServiceName +} + +func (m *MockTaskProvider) GetRunningTasks() []string { + return m.TaskIDs +} From 497c916d9ddf042c1377353b171e5cf3a488b227 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 7 Jul 2025 10:32:36 +0500 Subject: [PATCH 2/2] Refactor supernode services: consolidate status and task management - Removed redundant test files for supernode status and task services. - Implemented a new centralized SupernodeStatusService to manage system metrics and task information. - Introduced MetricsCollector for CPU and memory monitoring. - Created a unified StatusResponse structure to encapsulate service status details. - Developed a StorageHandler for P2P operations, including file storage and symbol management. - Enhanced task status management with a new Status type and associated methods. - Updated tests to reflect the new structure and ensure comprehensive coverage of service functionalities. --- sdk/adapters/supernodeservice/adapter.go | 14 ++- sdk/adapters/supernodeservice/types.go | 10 +- supernode/cmd/start.go | 3 +- .../server/cascade/cascade_action_server.go | 6 +- .../cascade/cascade_action_server_test.go | 10 +- .../node/supernode/server/status_server.go | 6 +- .../supernode/server/status_server_test.go | 7 +- .../cascade/adaptors/mocks/rq_mock.go | 10 +- supernode/services/cascade/adaptors/p2p.go | 6 +- supernode/services/cascade/download.go | 11 +- supernode/services/cascade/healthcheck.go | 20 ---- supernode/services/cascade/interfaces.go | 16 +-- .../cascade/mocks/cascade_interfaces_mock.go | 104 ++++++++++-------- supernode/services/cascade/service.go | 13 ++- supernode/services/cascade/status.go | 25 +++++ .../{healthcheck_test.go => status_test.go} | 22 ++-- supernode/services/cascade/task.go | 12 +- .../{service.go => base/supernode_service.go} | 2 +- .../common/{ => base}/supernode_task.go | 5 +- .../common/{ => base}/supernode_task_test.go | 9 +- .../handler.go} | 2 +- .../handler_test.go} | 10 +- .../services/common/supernode/metrics.go | 46 ++++++++ .../service.go} | 81 +++++--------- .../service_test.go} | 11 +- supernode/services/common/supernode/types.go | 35 ++++++ .../common/{status.go => task_status.go} | 0 .../{status_test.go => task_status_test.go} | 0 28 files changed, 294 insertions(+), 202 deletions(-) delete mode 100644 supernode/services/cascade/healthcheck.go create mode 100644 supernode/services/cascade/status.go rename supernode/services/cascade/{healthcheck_test.go => status_test.go} (79%) rename supernode/services/common/{service.go => base/supernode_service.go} (99%) rename supernode/services/common/{ => base}/supernode_task.go (92%) rename supernode/services/common/{ => base}/supernode_task_test.go (88%) rename supernode/services/common/{storage_handler.go => storage/handler.go} (99%) rename supernode/services/common/{storage_handler_test.go => storage/handler_test.go} (85%) create mode 100644 supernode/services/common/supernode/metrics.go rename supernode/services/common/{supernode_status.go => supernode/service.go} (52%) rename supernode/services/common/{supernode_status_test.go => supernode/service_test.go} (92%) create mode 100644 supernode/services/common/supernode/types.go rename supernode/services/common/{status.go => task_status.go} (100%) rename supernode/services/common/{status_test.go => task_status_test.go} (100%) diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index fd0cfb8d..34a5817f 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -316,11 +316,19 @@ func toSdkSupernodeStatus(resp *supernode.StatusResponse) *SupernodeStatusrespon result.Memory.UsedPerc = resp.Memory.UsedPerc } - // Aggregate all task IDs from all services - result.TasksInProgress = make([]string, 0) + // Convert Services data + result.Services = make([]ServiceTasks, 0, len(resp.Services)) for _, service := range resp.Services { - result.TasksInProgress = append(result.TasksInProgress, service.TaskIds...) + result.Services = append(result.Services, ServiceTasks{ + ServiceName: service.ServiceName, + TaskIDs: service.TaskIds, + TaskCount: service.TaskCount, + }) } + + // Convert AvailableServices data + result.AvailableServices = make([]string, len(resp.AvailableServices)) + copy(result.AvailableServices, resp.AvailableServices) return result } diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index 90c42040..79a67c6b 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -28,6 +28,13 @@ type CascadeSupernodeRegisterResponse struct { TxHash string } +// ServiceTasks contains task information for a specific service +type ServiceTasks struct { + ServiceName string + TaskIDs []string + TaskCount int32 +} + type SupernodeStatusresponse struct { CPU struct { Usage string @@ -39,7 +46,8 @@ type SupernodeStatusresponse struct { Available uint64 UsedPerc float64 } - TasksInProgress []string + Services []ServiceTasks + AvailableServices []string } type CascadeSupernodeDownloadRequest struct { ActionID string diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 8b3a206c..3d5e72e1 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -21,6 +21,7 @@ import ( "github.com/LumeraProtocol/supernode/supernode/node/supernode/server" cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" "github.com/LumeraProtocol/supernode/supernode/services/common" + supernodeService "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" cKeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/spf13/cobra" @@ -104,7 +105,7 @@ The supernode will connect to the Lumera network and begin participating in the cascadeActionServer := cascade.NewCascadeActionServer(cService) // Create supernode status service - statusService := common.NewSupernodeStatusService() + statusService := supernodeService.NewSupernodeStatusService() statusService.RegisterTaskProvider(cService) supernodeServer := server.NewSupernodeServer(statusService) diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 5865279e..319be25a 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -16,11 +16,11 @@ import ( type ActionServer struct { pb.UnimplementedCascadeServiceServer - factory cascadeService.TaskFactory + factory cascadeService.CascadeServiceFactory } // NewCascadeActionServer creates a new CascadeActionServer with injected service -func NewCascadeActionServer(factory cascadeService.TaskFactory) *ActionServer { +func NewCascadeActionServer(factory cascadeService.CascadeServiceFactory) *ActionServer { return &ActionServer{factory: factory} } @@ -233,7 +233,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS } } - err = task.DownloadCleanup(ctx, tmpDir) + err = task.CleanupDownload(ctx, tmpDir) if err != nil { logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{ logtrace.FieldError: err.Error(), diff --git a/supernode/node/action/server/cascade/cascade_action_server_test.go b/supernode/node/action/server/cascade/cascade_action_server_test.go index 46405ef4..621b6b97 100644 --- a/supernode/node/action/server/cascade/cascade_action_server_test.go +++ b/supernode/node/action/server/cascade/cascade_action_server_test.go @@ -17,8 +17,8 @@ func TestRegister_Success(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockTask := cascademocks.NewMockRegistrationTaskService(ctrl) - mockFactory := cascademocks.NewMockTaskFactory(ctrl) + mockTask := cascademocks.NewMockCascadeTask(ctrl) + mockFactory := cascademocks.NewMockCascadeServiceFactory(ctrl) // Expect Register to be called with any input, respond via callback mockTask.EXPECT().Register(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( @@ -56,7 +56,7 @@ func TestRegister_Error_NoMetadata(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockFactory := cascademocks.NewMockTaskFactory(ctrl) + mockFactory := cascademocks.NewMockCascadeServiceFactory(ctrl) server := NewCascadeActionServer(mockFactory) stream := &mockStream{ @@ -74,8 +74,8 @@ func TestRegister_Error_TaskFails(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockTask := cascademocks.NewMockRegistrationTaskService(ctrl) - mockFactory := cascademocks.NewMockTaskFactory(ctrl) + mockTask := cascademocks.NewMockCascadeTask(ctrl) + mockFactory := cascademocks.NewMockCascadeServiceFactory(ctrl) mockTask.EXPECT().Register(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("task failed")).Times(1) mockFactory.EXPECT().NewCascadeRegistrationTask().Return(mockTask).Times(1) diff --git a/supernode/node/supernode/server/status_server.go b/supernode/node/supernode/server/status_server.go index c68a4728..91e172a4 100644 --- a/supernode/node/supernode/server/status_server.go +++ b/supernode/node/supernode/server/status_server.go @@ -6,17 +6,17 @@ import ( "google.golang.org/grpc" pb "github.com/LumeraProtocol/supernode/gen/supernode" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" ) // SupernodeServer implements the SupernodeService gRPC service type SupernodeServer struct { pb.UnimplementedSupernodeServiceServer - statusService *common.SupernodeStatusService + statusService *supernode.SupernodeStatusService } // NewSupernodeServer creates a new SupernodeServer -func NewSupernodeServer(statusService *common.SupernodeStatusService) *SupernodeServer { +func NewSupernodeServer(statusService *supernode.SupernodeStatusService) *SupernodeServer { return &SupernodeServer{ statusService: statusService, } diff --git a/supernode/node/supernode/server/status_server_test.go b/supernode/node/supernode/server/status_server_test.go index 133f38ff..99f3db1b 100644 --- a/supernode/node/supernode/server/status_server_test.go +++ b/supernode/node/supernode/server/status_server_test.go @@ -9,13 +9,14 @@ import ( pb "github.com/LumeraProtocol/supernode/gen/supernode" "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" ) func TestSupernodeServer_GetStatus(t *testing.T) { ctx := context.Background() // Create status service - statusService := common.NewSupernodeStatusService() + statusService := supernode.NewSupernodeStatusService() // Create server server := NewSupernodeServer(statusService) @@ -41,7 +42,7 @@ func TestSupernodeServer_GetStatusWithService(t *testing.T) { ctx := context.Background() // Create status service - statusService := common.NewSupernodeStatusService() + statusService := supernode.NewSupernodeStatusService() // Add a mock task provider mockProvider := &common.MockTaskProvider{ @@ -71,7 +72,7 @@ func TestSupernodeServer_GetStatusWithService(t *testing.T) { } func TestSupernodeServer_Desc(t *testing.T) { - statusService := common.NewSupernodeStatusService() + statusService := supernode.NewSupernodeStatusService() server := NewSupernodeServer(statusService) desc := server.Desc() diff --git a/supernode/services/cascade/adaptors/mocks/rq_mock.go b/supernode/services/cascade/adaptors/mocks/rq_mock.go index b51a7222..37659ccf 100644 --- a/supernode/services/cascade/adaptors/mocks/rq_mock.go +++ b/supernode/services/cascade/adaptors/mocks/rq_mock.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: rq.go +// +// Generated by this command: +// +// mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go +// // Package cascadeadaptormocks is a generated GoMock package. package cascadeadaptormocks @@ -16,6 +21,7 @@ import ( type MockCodecService struct { ctrl *gomock.Controller recorder *MockCodecServiceMockRecorder + isgomock struct{} } // MockCodecServiceMockRecorder is the mock recorder for MockCodecService. @@ -45,7 +51,7 @@ func (m *MockCodecService) Decode(ctx context.Context, req adaptors.DecodeReques } // Decode indicates an expected call of Decode. -func (mr *MockCodecServiceMockRecorder) Decode(ctx, req interface{}) *gomock.Call { +func (mr *MockCodecServiceMockRecorder) Decode(ctx, req any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockCodecService)(nil).Decode), ctx, req) } @@ -60,7 +66,7 @@ func (m *MockCodecService) EncodeInput(ctx context.Context, taskID, path string, } // EncodeInput indicates an expected call of EncodeInput. -func (mr *MockCodecServiceMockRecorder) EncodeInput(ctx, taskID, path, dataSize interface{}) *gomock.Call { +func (mr *MockCodecServiceMockRecorder) EncodeInput(ctx, taskID, path, dataSize any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EncodeInput", reflect.TypeOf((*MockCodecService)(nil).EncodeInput), ctx, taskID, path, dataSize) } diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go index 6236830d..b2e2f719 100644 --- a/supernode/services/cascade/adaptors/p2p.go +++ b/supernode/services/cascade/adaptors/p2p.go @@ -15,7 +15,7 @@ import ( "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/storage" "github.com/pkg/errors" ) @@ -71,7 +71,7 @@ func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]by "fileCount": len(metadataFiles), }) - return p.p2p.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata, taskID) + return p.p2p.StoreBatch(ctx, metadataFiles, storage.P2PDataCascadeMetadata, taskID) } func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) error { @@ -155,7 +155,7 @@ func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fi return fmt.Errorf("load symbols: %w", err) } - if err := c.p2p.StoreBatch(ctx, symbols, common.P2PDataRaptorQSymbol, taskID); err != nil { + if err := c.p2p.StoreBatch(ctx, symbols, storage.P2PDataRaptorQSymbol, taskID); err != nil { return fmt.Errorf("p2p store batch: %w", err) } logtrace.Info(ctx, "stored batch symbols", logtrace.Fields{"count": len(symbols)}) diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index 94ce976c..fab26fba 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -190,13 +190,14 @@ func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEven return } -func (task *CascadeRegistrationTask) DownloadCleanup(ctx context.Context, symbolsDir string) error { - if symbolsDir == "" { - return errors.New("symbolsDir path is empty") +func (task *CascadeRegistrationTask) CleanupDownload(ctx context.Context, actionID string) error { + if actionID == "" { + return errors.New("actionID is empty") } - if err := os.RemoveAll(symbolsDir); err != nil { - return errors.Errorf("failed to delete symbols directory: %s, :%s", symbolsDir, err.Error()) + // For now, we use actionID as the directory path to maintain compatibility + if err := os.RemoveAll(actionID); err != nil { + return errors.Errorf("failed to delete download directory: %s, :%s", actionID, err.Error()) } return nil diff --git a/supernode/services/cascade/healthcheck.go b/supernode/services/cascade/healthcheck.go deleted file mode 100644 index 3c0d8859..00000000 --- a/supernode/services/cascade/healthcheck.go +++ /dev/null @@ -1,20 +0,0 @@ -package cascade - -import ( - "context" - - "github.com/LumeraProtocol/supernode/supernode/services/common" -) - -// HealthCheckResponse represents the health check response for cascade service -type HealthCheckResponse = common.StatusResponse - -// HealthCheck delegates to the common supernode status service -func (task *CascadeRegistrationTask) HealthCheck(ctx context.Context) (HealthCheckResponse, error) { - // Create a status service and register the cascade service as a task provider - statusService := common.NewSupernodeStatusService() - statusService.RegisterTaskProvider(task.CascadeService) - - // Get the status from the common service - return statusService.GetStatus(ctx) -} diff --git a/supernode/services/cascade/interfaces.go b/supernode/services/cascade/interfaces.go index 82c7002b..4885648f 100644 --- a/supernode/services/cascade/interfaces.go +++ b/supernode/services/cascade/interfaces.go @@ -2,19 +2,21 @@ package cascade import ( "context" + + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" ) -// TaskFactory defines an interface to create a new cascade registration task +// CascadeServiceFactory defines an interface to create cascade tasks // //go:generate mockgen -destination=mocks/cascade_interfaces_mock.go -package=cascademocks -source=interfaces.go -type TaskFactory interface { - NewCascadeRegistrationTask() RegistrationTaskService +type CascadeServiceFactory interface { + NewCascadeRegistrationTask() CascadeTask } -// RegistrationTaskService interface allows to register a new cascade -type RegistrationTaskService interface { +// CascadeTask interface defines operations for cascade registration and data management +type CascadeTask interface { Register(ctx context.Context, req *RegisterRequest, send func(resp *RegisterResponse) error) error - HealthCheck(ctx context.Context) (HealthCheckResponse, error) + GetStatus(ctx context.Context) (supernode.StatusResponse, error) Download(ctx context.Context, req *DownloadRequest, send func(resp *DownloadResponse) error) error - DownloadCleanup(ctx context.Context, actionID string) error + CleanupDownload(ctx context.Context, actionID string) error } diff --git a/supernode/services/cascade/mocks/cascade_interfaces_mock.go b/supernode/services/cascade/mocks/cascade_interfaces_mock.go index 4cd51c65..eae5b750 100644 --- a/supernode/services/cascade/mocks/cascade_interfaces_mock.go +++ b/supernode/services/cascade/mocks/cascade_interfaces_mock.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: interfaces.go +// +// Generated by this command: +// +// mockgen -destination=mocks/cascade_interfaces_mock.go -package=cascademocks -source=interfaces.go +// // Package cascademocks is a generated GoMock package. package cascademocks @@ -9,114 +14,117 @@ import ( reflect "reflect" cascade "github.com/LumeraProtocol/supernode/supernode/services/cascade" + supernode "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" gomock "go.uber.org/mock/gomock" ) -// MockTaskFactory is a mock of TaskFactory interface. -type MockTaskFactory struct { +// MockCascadeServiceFactory is a mock of CascadeServiceFactory interface. +type MockCascadeServiceFactory struct { ctrl *gomock.Controller - recorder *MockTaskFactoryMockRecorder + recorder *MockCascadeServiceFactoryMockRecorder + isgomock struct{} } -// MockTaskFactoryMockRecorder is the mock recorder for MockTaskFactory. -type MockTaskFactoryMockRecorder struct { - mock *MockTaskFactory +// MockCascadeServiceFactoryMockRecorder is the mock recorder for MockCascadeServiceFactory. +type MockCascadeServiceFactoryMockRecorder struct { + mock *MockCascadeServiceFactory } -// NewMockTaskFactory creates a new mock instance. -func NewMockTaskFactory(ctrl *gomock.Controller) *MockTaskFactory { - mock := &MockTaskFactory{ctrl: ctrl} - mock.recorder = &MockTaskFactoryMockRecorder{mock} +// NewMockCascadeServiceFactory creates a new mock instance. +func NewMockCascadeServiceFactory(ctrl *gomock.Controller) *MockCascadeServiceFactory { + mock := &MockCascadeServiceFactory{ctrl: ctrl} + mock.recorder = &MockCascadeServiceFactoryMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockTaskFactory) EXPECT() *MockTaskFactoryMockRecorder { +func (m *MockCascadeServiceFactory) EXPECT() *MockCascadeServiceFactoryMockRecorder { return m.recorder } // NewCascadeRegistrationTask mocks base method. -func (m *MockTaskFactory) NewCascadeRegistrationTask() cascade.RegistrationTaskService { +func (m *MockCascadeServiceFactory) NewCascadeRegistrationTask() cascade.CascadeTask { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewCascadeRegistrationTask") - ret0, _ := ret[0].(cascade.RegistrationTaskService) + ret0, _ := ret[0].(cascade.CascadeTask) return ret0 } // NewCascadeRegistrationTask indicates an expected call of NewCascadeRegistrationTask. -func (mr *MockTaskFactoryMockRecorder) NewCascadeRegistrationTask() *gomock.Call { +func (mr *MockCascadeServiceFactoryMockRecorder) NewCascadeRegistrationTask() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewCascadeRegistrationTask", reflect.TypeOf((*MockTaskFactory)(nil).NewCascadeRegistrationTask)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewCascadeRegistrationTask", reflect.TypeOf((*MockCascadeServiceFactory)(nil).NewCascadeRegistrationTask)) } -// MockRegistrationTaskService is a mock of RegistrationTaskService interface. -type MockRegistrationTaskService struct { +// MockCascadeTask is a mock of CascadeTask interface. +type MockCascadeTask struct { ctrl *gomock.Controller - recorder *MockRegistrationTaskServiceMockRecorder + recorder *MockCascadeTaskMockRecorder + isgomock struct{} } -// MockRegistrationTaskServiceMockRecorder is the mock recorder for MockRegistrationTaskService. -type MockRegistrationTaskServiceMockRecorder struct { - mock *MockRegistrationTaskService +// MockCascadeTaskMockRecorder is the mock recorder for MockCascadeTask. +type MockCascadeTaskMockRecorder struct { + mock *MockCascadeTask } -// NewMockRegistrationTaskService creates a new mock instance. -func NewMockRegistrationTaskService(ctrl *gomock.Controller) *MockRegistrationTaskService { - mock := &MockRegistrationTaskService{ctrl: ctrl} - mock.recorder = &MockRegistrationTaskServiceMockRecorder{mock} +// NewMockCascadeTask creates a new mock instance. +func NewMockCascadeTask(ctrl *gomock.Controller) *MockCascadeTask { + mock := &MockCascadeTask{ctrl: ctrl} + mock.recorder = &MockCascadeTaskMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockRegistrationTaskService) EXPECT() *MockRegistrationTaskServiceMockRecorder { +func (m *MockCascadeTask) EXPECT() *MockCascadeTaskMockRecorder { return m.recorder } -// Download mocks base method. -func (m *MockRegistrationTaskService) Download(ctx context.Context, req *cascade.DownloadRequest, send func(*cascade.DownloadResponse) error) error { +// CleanupDownload mocks base method. +func (m *MockCascadeTask) CleanupDownload(ctx context.Context, actionID string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Download", ctx, req, send) + ret := m.ctrl.Call(m, "CleanupDownload", ctx, actionID) ret0, _ := ret[0].(error) return ret0 } -// Download indicates an expected call of Download. -func (mr *MockRegistrationTaskServiceMockRecorder) Download(ctx, req, send interface{}) *gomock.Call { +// CleanupDownload indicates an expected call of CleanupDownload. +func (mr *MockCascadeTaskMockRecorder) CleanupDownload(ctx, actionID any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockRegistrationTaskService)(nil).Download), ctx, req, send) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupDownload", reflect.TypeOf((*MockCascadeTask)(nil).CleanupDownload), ctx, actionID) } -// DownloadCleanup mocks base method. -func (m *MockRegistrationTaskService) DownloadCleanup(ctx context.Context, actionID string) error { +// Download mocks base method. +func (m *MockCascadeTask) Download(ctx context.Context, req *cascade.DownloadRequest, send func(*cascade.DownloadResponse) error) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadCleanup", ctx, actionID) + ret := m.ctrl.Call(m, "Download", ctx, req, send) ret0, _ := ret[0].(error) return ret0 } -// DownloadCleanup indicates an expected call of DownloadCleanup. -func (mr *MockRegistrationTaskServiceMockRecorder) DownloadCleanup(ctx, actionID interface{}) *gomock.Call { +// Download indicates an expected call of Download. +func (mr *MockCascadeTaskMockRecorder) Download(ctx, req, send any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadCleanup", reflect.TypeOf((*MockRegistrationTaskService)(nil).DownloadCleanup), ctx, actionID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockCascadeTask)(nil).Download), ctx, req, send) } -// HealthCheck mocks base method. -func (m *MockRegistrationTaskService) HealthCheck(ctx context.Context) (cascade.HealthCheckResponse, error) { +// GetStatus mocks base method. +func (m *MockCascadeTask) GetStatus(ctx context.Context) (supernode.StatusResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HealthCheck", ctx) - ret0, _ := ret[0].(cascade.HealthCheckResponse) + ret := m.ctrl.Call(m, "GetStatus", ctx) + ret0, _ := ret[0].(supernode.StatusResponse) ret1, _ := ret[1].(error) return ret0, ret1 } -// HealthCheck indicates an expected call of HealthCheck. -func (mr *MockRegistrationTaskServiceMockRecorder) HealthCheck(ctx interface{}) *gomock.Call { +// GetStatus indicates an expected call of GetStatus. +func (mr *MockCascadeTaskMockRecorder) GetStatus(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HealthCheck", reflect.TypeOf((*MockRegistrationTaskService)(nil).HealthCheck), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatus", reflect.TypeOf((*MockCascadeTask)(nil).GetStatus), ctx) } // Register mocks base method. -func (m *MockRegistrationTaskService) Register(ctx context.Context, req *cascade.RegisterRequest, send func(*cascade.RegisterResponse) error) error { +func (m *MockCascadeTask) Register(ctx context.Context, req *cascade.RegisterRequest, send func(*cascade.RegisterResponse) error) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Register", ctx, req, send) ret0, _ := ret[0].(error) @@ -124,7 +132,7 @@ func (m *MockRegistrationTaskService) Register(ctx context.Context, req *cascade } // Register indicates an expected call of Register. -func (mr *MockRegistrationTaskServiceMockRecorder) Register(ctx, req, send interface{}) *gomock.Call { +func (mr *MockCascadeTaskMockRecorder) Register(ctx, req, send any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockRegistrationTaskService)(nil).Register), ctx, req, send) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockCascadeTask)(nil).Register), ctx, req, send) } diff --git a/supernode/services/cascade/service.go b/supernode/services/cascade/service.go index f0cb1093..8241b323 100644 --- a/supernode/services/cascade/service.go +++ b/supernode/services/cascade/service.go @@ -8,11 +8,12 @@ import ( "github.com/LumeraProtocol/supernode/pkg/lumera" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/base" + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" ) type CascadeService struct { - *common.SuperNodeService + *base.SuperNodeService config *Config LumeraClient adaptors.LumeraClient @@ -20,8 +21,12 @@ type CascadeService struct { RQ adaptors.CodecService } +// Compile-time checks to ensure CascadeService implements required interfaces +var _ supernode.TaskProvider = (*CascadeService)(nil) +var _ CascadeServiceFactory = (*CascadeService)(nil) + // NewCascadeRegistrationTask creates a new task for cascade registration -func (service *CascadeService) NewCascadeRegistrationTask() RegistrationTaskService { +func (service *CascadeService) NewCascadeRegistrationTask() CascadeTask { task := NewCascadeRegistrationTask(service) service.Worker.AddTask(task) return task @@ -50,7 +55,7 @@ func (service *CascadeService) GetRunningTasks() []string { func NewCascadeService(config *Config, lumera lumera.Client, p2pClient p2p.Client, codec codec.Codec, rqstore rqstore.Store) *CascadeService { return &CascadeService{ config: config, - SuperNodeService: common.NewSuperNodeService(p2pClient), + SuperNodeService: base.NewSuperNodeService(p2pClient), LumeraClient: adaptors.NewLumeraClient(lumera), P2P: adaptors.NewP2PService(p2pClient, rqstore), RQ: adaptors.NewCodecService(codec), diff --git a/supernode/services/cascade/status.go b/supernode/services/cascade/status.go new file mode 100644 index 00000000..f974ba36 --- /dev/null +++ b/supernode/services/cascade/status.go @@ -0,0 +1,25 @@ +package cascade + +import ( + "context" + + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" +) + +// StatusResponse represents the status response for cascade service +type StatusResponse = supernode.StatusResponse + +// GetStatus delegates to the common supernode status service +func (service *CascadeService) GetStatus(ctx context.Context) (StatusResponse, error) { + // Create a status service and register the cascade service as a task provider + statusService := supernode.NewSupernodeStatusService() + statusService.RegisterTaskProvider(service) + + // Get the status from the common service + return statusService.GetStatus(ctx) +} + +// GetStatus method for task interface compatibility +func (task *CascadeRegistrationTask) GetStatus(ctx context.Context) (StatusResponse, error) { + return task.CascadeService.GetStatus(ctx) +} diff --git a/supernode/services/cascade/healthcheck_test.go b/supernode/services/cascade/status_test.go similarity index 79% rename from supernode/services/cascade/healthcheck_test.go rename to supernode/services/cascade/status_test.go index e4cdaed1..8dc2de6b 100644 --- a/supernode/services/cascade/healthcheck_test.go +++ b/supernode/services/cascade/status_test.go @@ -4,11 +4,12 @@ import ( "context" "testing" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/base" + "github.com/LumeraProtocol/supernode/supernode/services/common/supernode" "github.com/stretchr/testify/assert" ) -func TestHealthCheck(t *testing.T) { +func TestGetStatus(t *testing.T) { ctx := context.Background() tests := []struct { @@ -41,11 +42,9 @@ func TestHealthCheck(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Setup service and worker service := &CascadeService{ - SuperNodeService: common.NewSuperNodeService(nil), + SuperNodeService: base.NewSuperNodeService(nil), } - var primaryTask *CascadeRegistrationTask - go func() { service.RunHelper(ctx, "node-id", "prefix") }() @@ -54,17 +53,10 @@ func TestHealthCheck(t *testing.T) { for i := 0; i < tt.taskCount; i++ { task := NewCascadeRegistrationTask(service) service.Worker.AddTask(task) - if i == 0 { - primaryTask = task - } - } - - // Always call HealthCheck from first task (if any), otherwise create a temp one - if primaryTask == nil { - primaryTask = NewCascadeRegistrationTask(service) } - resp, err := primaryTask.HealthCheck(ctx) + // Call GetStatus from service + resp, err := service.GetStatus(ctx) if tt.expectErr { assert.Error(t, err) return @@ -85,7 +77,7 @@ func TestHealthCheck(t *testing.T) { assert.Contains(t, resp.AvailableServices, "cascade") // Task count check - look for cascade service in the services list - var cascadeService *common.ServiceTasks + var cascadeService *supernode.ServiceTasks for _, service := range resp.Services { if service.ServiceName == "cascade" { cascadeService = &service diff --git a/supernode/services/cascade/task.go b/supernode/services/cascade/task.go index 92096943..538214cf 100644 --- a/supernode/services/cascade/task.go +++ b/supernode/services/cascade/task.go @@ -3,15 +3,16 @@ package cascade import ( "context" "github.com/LumeraProtocol/supernode/pkg/storage/files" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/common/base" + "github.com/LumeraProtocol/supernode/supernode/services/common/storage" ) // CascadeRegistrationTask is the task for cascade registration type CascadeRegistrationTask struct { *CascadeService - *common.SuperNodeTask - storage *common.StorageHandler + *base.SuperNodeTask + storage *storage.StorageHandler Asset *files.File dataHash string @@ -22,6 +23,9 @@ const ( logPrefix = "cascade" ) +// Compile-time check to ensure CascadeRegistrationTask implements CascadeTask interface +var _ CascadeTask = (*CascadeRegistrationTask)(nil) + // Run starts the task func (task *CascadeRegistrationTask) Run(ctx context.Context) error { return task.RunHelper(ctx, task.removeArtifacts) @@ -35,7 +39,7 @@ func (task *CascadeRegistrationTask) removeArtifacts() { // NewCascadeRegistrationTask returns a new Task instance func NewCascadeRegistrationTask(service *CascadeService) *CascadeRegistrationTask { task := &CascadeRegistrationTask{ - SuperNodeTask: common.NewSuperNodeTask(logPrefix), + SuperNodeTask: base.NewSuperNodeTask(logPrefix), CascadeService: service, } diff --git a/supernode/services/common/service.go b/supernode/services/common/base/supernode_service.go similarity index 99% rename from supernode/services/common/service.go rename to supernode/services/common/base/supernode_service.go index de19a0b2..524fabbe 100644 --- a/supernode/services/common/service.go +++ b/supernode/services/common/base/supernode_service.go @@ -1,4 +1,4 @@ -package common +package base import ( "context" diff --git a/supernode/services/common/supernode_task.go b/supernode/services/common/base/supernode_task.go similarity index 92% rename from supernode/services/common/supernode_task.go rename to supernode/services/common/base/supernode_task.go index 6535e842..3ee80f99 100644 --- a/supernode/services/common/supernode_task.go +++ b/supernode/services/common/base/supernode_task.go @@ -1,4 +1,4 @@ -package common +package base import ( "context" @@ -8,6 +8,7 @@ import ( "github.com/LumeraProtocol/supernode/pkg/common/task/state" "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/pkg/storage/files" + "github.com/LumeraProtocol/supernode/supernode/services/common" ) // TaskCleanerFunc pointer to func that removes artefacts @@ -53,7 +54,7 @@ func (task *SuperNodeTask) RemoveFile(file *files.File) { // NewSuperNodeTask returns a new Task instance. func NewSuperNodeTask(logPrefix string) *SuperNodeTask { snt := &SuperNodeTask{ - Task: task.New(StatusTaskStarted), + Task: task.New(common.StatusTaskStarted), LogPrefix: logPrefix, } diff --git a/supernode/services/common/supernode_task_test.go b/supernode/services/common/base/supernode_task_test.go similarity index 88% rename from supernode/services/common/supernode_task_test.go rename to supernode/services/common/base/supernode_task_test.go index 2bd4be8a..9e108f59 100644 --- a/supernode/services/common/supernode_task_test.go +++ b/supernode/services/common/base/supernode_task_test.go @@ -1,4 +1,4 @@ -package common_test +package base import ( "context" @@ -7,12 +7,11 @@ import ( "testing" "time" - "github.com/LumeraProtocol/supernode/supernode/services/common" "github.com/stretchr/testify/assert" ) func TestNewSuperNodeTask(t *testing.T) { - task := common.NewSuperNodeTask("testprefix") + task := NewSuperNodeTask("testprefix") assert.NotNil(t, task) assert.Equal(t, "testprefix", task.LogPrefix) } @@ -23,7 +22,7 @@ func TestSuperNodeTask_RunHelper(t *testing.T) { called = true } - snt := common.NewSuperNodeTask("log") + snt := NewSuperNodeTask("log") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +53,7 @@ func TestSuperNodeTask_RunHelper(t *testing.T) { } func TestSuperNodeTask_RunHelper_WithError(t *testing.T) { - snt := common.NewSuperNodeTask("log") + snt := NewSuperNodeTask("log") ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/supernode/services/common/storage_handler.go b/supernode/services/common/storage/handler.go similarity index 99% rename from supernode/services/common/storage_handler.go rename to supernode/services/common/storage/handler.go index 078b5297..16794fd1 100644 --- a/supernode/services/common/storage_handler.go +++ b/supernode/services/common/storage/handler.go @@ -1,4 +1,4 @@ -package common +package storage import ( "context" diff --git a/supernode/services/common/storage_handler_test.go b/supernode/services/common/storage/handler_test.go similarity index 85% rename from supernode/services/common/storage_handler_test.go rename to supernode/services/common/storage/handler_test.go index 369a9a42..4504e407 100644 --- a/supernode/services/common/storage_handler_test.go +++ b/supernode/services/common/storage/handler_test.go @@ -1,11 +1,11 @@ -package common_test +package storage import ( "context" - "github.com/LumeraProtocol/supernode/p2p/mocks" "testing" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/p2p/mocks" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -32,7 +32,7 @@ func (m *mockStore) UpdateIsFirstBatchStored(txID string) error { func TestStoreBytesIntoP2P(t *testing.T) { p2pClient := new(mockP2PClient) - handler := common.NewStorageHandler(p2pClient, "", nil) + handler := NewStorageHandler(p2pClient, "", nil) data := []byte("hello") p2pClient.On("Store", mock.Anything, data, 1).Return("some-id", nil) @@ -45,7 +45,7 @@ func TestStoreBytesIntoP2P(t *testing.T) { func TestStoreBatch(t *testing.T) { p2pClient := new(mockP2PClient) - handler := common.NewStorageHandler(p2pClient, "", nil) + handler := NewStorageHandler(p2pClient, "", nil) ctx := context.WithValue(context.Background(), "task_id", "123") list := [][]byte{[]byte("a"), []byte("b")} diff --git a/supernode/services/common/supernode/metrics.go b/supernode/services/common/supernode/metrics.go new file mode 100644 index 00000000..4940a08f --- /dev/null +++ b/supernode/services/common/supernode/metrics.go @@ -0,0 +1,46 @@ +package supernode + +import ( + "context" + "fmt" + "time" + + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/mem" +) + +// MetricsCollector handles system resource monitoring +type MetricsCollector struct{} + +// NewMetricsCollector creates a new metrics collector instance +func NewMetricsCollector() *MetricsCollector { + return &MetricsCollector{} +} + +// CollectCPUMetrics gathers CPU usage information +// Returns usage and remaining percentages as formatted strings +func (m *MetricsCollector) CollectCPUMetrics(ctx context.Context) (usage, remaining string, err error) { + percentages, err := cpu.Percent(time.Second, false) + if err != nil { + logtrace.Error(ctx, "failed to get cpu info", logtrace.Fields{logtrace.FieldError: err.Error()}) + return "", "", err + } + + usageFloat := percentages[0] + remainingFloat := 100 - usageFloat + + return fmt.Sprintf("%.2f", usageFloat), fmt.Sprintf("%.2f", remainingFloat), nil +} + +// CollectMemoryMetrics gathers memory usage information +// Returns memory statistics including total, used, available, and usage percentage +func (m *MetricsCollector) CollectMemoryMetrics(ctx context.Context) (total, used, available uint64, usedPerc float64, err error) { + vmem, err := mem.VirtualMemory() + if err != nil { + logtrace.Error(ctx, "failed to get memory info", logtrace.Fields{logtrace.FieldError: err.Error()}) + return 0, 0, 0, 0, err + } + + return vmem.Total, vmem.Used, vmem.Available, vmem.UsedPercent, nil +} \ No newline at end of file diff --git a/supernode/services/common/supernode_status.go b/supernode/services/common/supernode/service.go similarity index 52% rename from supernode/services/common/supernode_status.go rename to supernode/services/common/supernode/service.go index 631ebd58..09290c80 100644 --- a/supernode/services/common/supernode_status.go +++ b/supernode/services/common/supernode/service.go @@ -1,62 +1,35 @@ -package common +package supernode import ( "context" - "fmt" - "time" "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/mem" ) -// StatusResponse represents system status -type StatusResponse struct { - CPU struct { - Usage string - Remaining string - } - Memory struct { - Total uint64 - Used uint64 - Available uint64 - UsedPerc float64 - } - Services []ServiceTasks - AvailableServices []string -} - -// ServiceTasks contains task information for a specific service -type ServiceTasks struct { - ServiceName string - TaskIDs []string - TaskCount int32 -} - -// TaskProvider interface for services to provide their running tasks -type TaskProvider interface { - GetServiceName() string - GetRunningTasks() []string -} // SupernodeStatusService provides centralized status information +// by collecting system metrics and aggregating task information from registered services type SupernodeStatusService struct { - taskProviders []TaskProvider + taskProviders []TaskProvider // List of registered services that provide task information + metrics *MetricsCollector // System metrics collector for CPU and memory stats } -// NewSupernodeStatusService creates a new supernode status service +// NewSupernodeStatusService creates a new supernode status service instance func NewSupernodeStatusService() *SupernodeStatusService { return &SupernodeStatusService{ taskProviders: make([]TaskProvider, 0), + metrics: NewMetricsCollector(), } } // RegisterTaskProvider registers a service as a task provider +// This allows the service to report its running tasks in status responses func (s *SupernodeStatusService) RegisterTaskProvider(provider TaskProvider) { s.taskProviders = append(s.taskProviders, provider) } // GetStatus returns the current system status including all registered services +// This method collects CPU metrics, memory usage, and task information from all providers func (s *SupernodeStatusService) GetStatus(ctx context.Context) (StatusResponse, error) { fields := logtrace.Fields{ logtrace.FieldMethod: "GetStatus", @@ -66,30 +39,25 @@ func (s *SupernodeStatusService) GetStatus(ctx context.Context) (StatusResponse, var resp StatusResponse - // Get CPU information - percentages, err := cpu.Percent(time.Second, false) + // Collect CPU metrics + cpuUsage, cpuRemaining, err := s.metrics.CollectCPUMetrics(ctx) if err != nil { - logtrace.Error(ctx, "failed to get cpu info", logtrace.Fields{logtrace.FieldError: err.Error()}) return resp, err } + resp.CPU.Usage = cpuUsage + resp.CPU.Remaining = cpuRemaining - usage := percentages[0] - remaining := 100 - usage - resp.CPU.Usage = fmt.Sprintf("%.2f", usage) - resp.CPU.Remaining = fmt.Sprintf("%.2f", remaining) - - // Get Memory information - vmem, err := mem.VirtualMemory() + // Collect memory metrics + memTotal, memUsed, memAvailable, memUsedPerc, err := s.metrics.CollectMemoryMetrics(ctx) if err != nil { - logtrace.Error(ctx, "failed to get memory info", logtrace.Fields{logtrace.FieldError: err.Error()}) return resp, err } - resp.Memory.Total = vmem.Total - resp.Memory.Used = vmem.Used - resp.Memory.Available = vmem.Available - resp.Memory.UsedPerc = vmem.UsedPercent + resp.Memory.Total = memTotal + resp.Memory.Used = memUsed + resp.Memory.Available = memAvailable + resp.Memory.UsedPerc = memUsedPerc - // Get service information from all registered providers + // Collect service information from all registered providers resp.Services = make([]ServiceTasks, 0, len(s.taskProviders)) resp.AvailableServices = make([]string, 0, len(s.taskProviders)) @@ -106,17 +74,18 @@ func (s *SupernodeStatusService) GetStatus(ctx context.Context) (StatusResponse, resp.AvailableServices = append(resp.AvailableServices, serviceName) } + // Log summary statistics totalTasks := 0 for _, service := range resp.Services { totalTasks += int(service.TaskCount) } logtrace.Info(ctx, "status data collected", logtrace.Fields{ - "cpu_usage": fmt.Sprintf("%.2f", usage), - "cpu_remaining": fmt.Sprintf("%.2f", remaining), - "mem_total": resp.Memory.Total, - "mem_used": resp.Memory.Used, - "mem_used%": resp.Memory.UsedPerc, + "cpu_usage": cpuUsage, + "cpu_remaining": cpuRemaining, + "mem_total": memTotal, + "mem_used": memUsed, + "mem_used%": memUsedPerc, "service_count": len(resp.Services), "total_tasks": totalTasks, }) diff --git a/supernode/services/common/supernode_status_test.go b/supernode/services/common/supernode/service_test.go similarity index 92% rename from supernode/services/common/supernode_status_test.go rename to supernode/services/common/supernode/service_test.go index 6391e104..df3ad50e 100644 --- a/supernode/services/common/supernode_status_test.go +++ b/supernode/services/common/supernode/service_test.go @@ -1,9 +1,10 @@ -package common +package supernode import ( "context" "testing" + "github.com/LumeraProtocol/supernode/supernode/services/common" "github.com/stretchr/testify/assert" ) @@ -30,7 +31,7 @@ func TestSupernodeStatusService(t *testing.T) { statusService := NewSupernodeStatusService() // Register a mock task provider - mockProvider := &MockTaskProvider{ + mockProvider := &common.MockTaskProvider{ ServiceName: "test-service", TaskIDs: []string{"task1", "task2", "task3"}, } @@ -54,11 +55,11 @@ func TestSupernodeStatusService(t *testing.T) { statusService := NewSupernodeStatusService() // Register multiple mock task providers - cascadeProvider := &MockTaskProvider{ + cascadeProvider := &common.MockTaskProvider{ ServiceName: "cascade", TaskIDs: []string{"cascade1", "cascade2"}, } - senseProvider := &MockTaskProvider{ + senseProvider := &common.MockTaskProvider{ ServiceName: "sense", TaskIDs: []string{"sense1"}, } @@ -96,7 +97,7 @@ func TestSupernodeStatusService(t *testing.T) { statusService := NewSupernodeStatusService() // Register a mock task provider with no tasks - mockProvider := &MockTaskProvider{ + mockProvider := &common.MockTaskProvider{ ServiceName: "empty-service", TaskIDs: []string{}, } diff --git a/supernode/services/common/supernode/types.go b/supernode/services/common/supernode/types.go new file mode 100644 index 00000000..a3aeed8c --- /dev/null +++ b/supernode/services/common/supernode/types.go @@ -0,0 +1,35 @@ +package supernode + +// StatusResponse represents the complete system status information +// including CPU usage, memory statistics, and service details +type StatusResponse struct { + CPU struct { + Usage string // CPU usage percentage as string (e.g., "45.32") + Remaining string // Remaining CPU capacity as string (e.g., "54.68") + } + Memory struct { + Total uint64 // Total memory in bytes + Used uint64 // Used memory in bytes + Available uint64 // Available memory in bytes + UsedPerc float64 // Memory usage percentage (0-100) + } + Services []ServiceTasks // List of registered services and their tasks + AvailableServices []string // Names of all available services +} + +// ServiceTasks contains task information for a specific service +type ServiceTasks struct { + ServiceName string // Name of the service (e.g., "cascade") + TaskIDs []string // List of currently running task IDs + TaskCount int32 // Total number of running tasks +} + +// TaskProvider interface defines the contract for services to provide +// their running task information to the status service +type TaskProvider interface { + // GetServiceName returns the unique name identifier for this service + GetServiceName() string + + // GetRunningTasks returns a list of currently active task IDs + GetRunningTasks() []string +} \ No newline at end of file diff --git a/supernode/services/common/status.go b/supernode/services/common/task_status.go similarity index 100% rename from supernode/services/common/status.go rename to supernode/services/common/task_status.go diff --git a/supernode/services/common/status_test.go b/supernode/services/common/task_status_test.go similarity index 100% rename from supernode/services/common/status_test.go rename to supernode/services/common/task_status_test.go