diff --git a/gen/supernode/action/cascade/service.pb.go b/gen/supernode/action/cascade/service.pb.go index b5105f4e..c106097f 100644 --- a/gen/supernode/action/cascade/service.pb.go +++ b/gen/supernode/action/cascade/service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc-gen-go v1.35.2 +// protoc v3.21.12 // source: proto/supernode/action/cascade/service.proto package cascade @@ -11,7 +11,6 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" - unsafe "unsafe" ) const ( @@ -22,15 +21,15 @@ const ( ) type UploadInputDataRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"` - ActionId string `protobuf:"bytes,2,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` - DataHash string `protobuf:"bytes,3,opt,name=data_hash,json=dataHash,proto3" json:"data_hash,omitempty"` - RqMax int32 `protobuf:"varint,4,opt,name=rq_max,json=rqMax,proto3" json:"rq_max,omitempty"` - SignedData string `protobuf:"bytes,5,opt,name=signed_data,json=signedData,proto3" json:"signed_data,omitempty"` - Data []byte `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` - unknownFields protoimpl.UnknownFields + state protoimpl.MessageState sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to RequestType: + // + // *UploadInputDataRequest_Chunk + // *UploadInputDataRequest_Metadata + RequestType isUploadInputDataRequest_RequestType `protobuf_oneof:"request_type"` } func (x *UploadInputDataRequest) Reset() { @@ -63,70 +62,65 @@ func (*UploadInputDataRequest) Descriptor() ([]byte, []int) { return file_proto_supernode_action_cascade_service_proto_rawDescGZIP(), []int{0} } -func (x *UploadInputDataRequest) GetFilename() string { - if x != nil { - return x.Filename +func (m *UploadInputDataRequest) GetRequestType() isUploadInputDataRequest_RequestType { + if m != nil { + return m.RequestType } - return "" + return nil } -func (x *UploadInputDataRequest) GetActionId() string { - if x != nil { - return x.ActionId +func (x *UploadInputDataRequest) GetChunk() *DataChunk { + if x, ok := x.GetRequestType().(*UploadInputDataRequest_Chunk); ok { + return x.Chunk } - return "" + return nil } -func (x *UploadInputDataRequest) GetDataHash() string { - if x != nil { - return x.DataHash +func (x *UploadInputDataRequest) GetMetadata() *Metadata { + if x, ok := x.GetRequestType().(*UploadInputDataRequest_Metadata); ok { + return x.Metadata } - return "" + return nil } -func (x *UploadInputDataRequest) GetRqMax() int32 { - if x != nil { - return x.RqMax - } - return 0 +type isUploadInputDataRequest_RequestType interface { + isUploadInputDataRequest_RequestType() } -func (x *UploadInputDataRequest) GetSignedData() string { - if x != nil { - return x.SignedData - } - return "" +type UploadInputDataRequest_Chunk struct { + Chunk *DataChunk `protobuf:"bytes,1,opt,name=chunk,proto3,oneof"` } -func (x *UploadInputDataRequest) GetData() []byte { - if x != nil { - return x.Data - } - return nil +type UploadInputDataRequest_Metadata struct { + Metadata *Metadata `protobuf:"bytes,2,opt,name=metadata,proto3,oneof"` } -type UploadInputDataResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` - Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` - unknownFields protoimpl.UnknownFields +func (*UploadInputDataRequest_Chunk) isUploadInputDataRequest_RequestType() {} + +func (*UploadInputDataRequest_Metadata) isUploadInputDataRequest_RequestType() {} + +type DataChunk struct { + state protoimpl.MessageState sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` } -func (x *UploadInputDataResponse) Reset() { - *x = UploadInputDataResponse{} +func (x *DataChunk) Reset() { + *x = DataChunk{} mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *UploadInputDataResponse) String() string { +func (x *DataChunk) String() string { return protoimpl.X.MessageStringOf(x) } -func (*UploadInputDataResponse) ProtoMessage() {} +func (*DataChunk) ProtoMessage() {} -func (x *UploadInputDataResponse) ProtoReflect() protoreflect.Message { +func (x *DataChunk) ProtoReflect() protoreflect.Message { mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -138,46 +132,44 @@ func (x *UploadInputDataResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use UploadInputDataResponse.ProtoReflect.Descriptor instead. -func (*UploadInputDataResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use DataChunk.ProtoReflect.Descriptor instead. +func (*DataChunk) Descriptor() ([]byte, []int) { return file_proto_supernode_action_cascade_service_proto_rawDescGZIP(), []int{1} } -func (x *UploadInputDataResponse) GetSuccess() bool { +func (x *DataChunk) GetData() []byte { if x != nil { - return x.Success - } - return false -} - -func (x *UploadInputDataResponse) GetMessage() string { - if x != nil { - return x.Message + return x.Data } - return "" + return nil } -type SessionRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - IsPrimary bool `protobuf:"varint,1,opt,name=is_primary,json=isPrimary,proto3" json:"is_primary,omitempty"` - unknownFields protoimpl.UnknownFields +type Metadata struct { + state protoimpl.MessageState sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"` + ActionId string `protobuf:"bytes,2,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` + DataHash string `protobuf:"bytes,3,opt,name=data_hash,json=dataHash,proto3" json:"data_hash,omitempty"` + RqMax int32 `protobuf:"varint,4,opt,name=rq_max,json=rqMax,proto3" json:"rq_max,omitempty"` + SignedData string `protobuf:"bytes,5,opt,name=signed_data,json=signedData,proto3" json:"signed_data,omitempty"` } -func (x *SessionRequest) Reset() { - *x = SessionRequest{} +func (x *Metadata) Reset() { + *x = Metadata{} mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *SessionRequest) String() string { +func (x *Metadata) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SessionRequest) ProtoMessage() {} +func (*Metadata) ProtoMessage() {} -func (x *SessionRequest) ProtoReflect() protoreflect.Message { +func (x *Metadata) ProtoReflect() protoreflect.Message { mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -189,39 +181,69 @@ func (x *SessionRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SessionRequest.ProtoReflect.Descriptor instead. -func (*SessionRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Metadata.ProtoReflect.Descriptor instead. +func (*Metadata) Descriptor() ([]byte, []int) { return file_proto_supernode_action_cascade_service_proto_rawDescGZIP(), []int{2} } -func (x *SessionRequest) GetIsPrimary() bool { +func (x *Metadata) GetFilename() string { if x != nil { - return x.IsPrimary + return x.Filename } - return false + return "" } -type SessionReply struct { - state protoimpl.MessageState `protogen:"open.v1"` - SessID string `protobuf:"bytes,1,opt,name=sessID,proto3" json:"sessID,omitempty"` - unknownFields protoimpl.UnknownFields +func (x *Metadata) GetActionId() string { + if x != nil { + return x.ActionId + } + return "" +} + +func (x *Metadata) GetDataHash() string { + if x != nil { + return x.DataHash + } + return "" +} + +func (x *Metadata) GetRqMax() int32 { + if x != nil { + return x.RqMax + } + return 0 +} + +func (x *Metadata) GetSignedData() string { + if x != nil { + return x.SignedData + } + return "" +} + +type UploadInputDataResponse struct { + state protoimpl.MessageState sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` } -func (x *SessionReply) Reset() { - *x = SessionReply{} +func (x *UploadInputDataResponse) Reset() { + *x = UploadInputDataResponse{} mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *SessionReply) String() string { +func (x *UploadInputDataResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SessionReply) ProtoMessage() {} +func (*UploadInputDataResponse) ProtoMessage() {} -func (x *SessionReply) ProtoReflect() protoreflect.Message { +func (x *UploadInputDataResponse) ProtoReflect() protoreflect.Message { mi := &file_proto_supernode_action_cascade_service_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -233,72 +255,78 @@ func (x *SessionReply) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SessionReply.ProtoReflect.Descriptor instead. -func (*SessionReply) Descriptor() ([]byte, []int) { +// Deprecated: Use UploadInputDataResponse.ProtoReflect.Descriptor instead. +func (*UploadInputDataResponse) Descriptor() ([]byte, []int) { return file_proto_supernode_action_cascade_service_proto_rawDescGZIP(), []int{3} } -func (x *SessionReply) GetSessID() string { +func (x *UploadInputDataResponse) GetSuccess() bool { if x != nil { - return x.SessID + return x.Success + } + return false +} + +func (x *UploadInputDataResponse) GetMessage() string { + if x != nil { + return x.Message } return "" } var File_proto_supernode_action_cascade_service_proto protoreflect.FileDescriptor -var file_proto_supernode_action_cascade_service_proto_rawDesc = string([]byte{ +var file_proto_supernode_action_cascade_service_proto_rawDesc = []byte{ 0x0a, 0x2c, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, - 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x16, 0x55, 0x70, 0x6c, 0x6f, + 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x22, 0x85, 0x01, 0x0a, 0x16, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, - 0x0a, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, - 0x61, 0x74, 0x61, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x64, 0x61, 0x74, 0x61, 0x48, 0x61, 0x73, 0x68, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x71, 0x5f, 0x6d, - 0x61, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x71, 0x4d, 0x61, 0x78, 0x12, - 0x1f, 0x0a, 0x0b, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x64, 0x61, 0x74, 0x61, 0x22, 0x4d, 0x0a, 0x17, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, - 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x22, 0x2f, 0x0a, 0x0e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x70, 0x72, 0x69, 0x6d, - 0x61, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x50, 0x72, 0x69, - 0x6d, 0x61, 0x72, 0x79, 0x22, 0x26, 0x0a, 0x0c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x73, 0x73, 0x49, 0x44, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x73, 0x73, 0x49, 0x44, 0x32, 0xa5, 0x01, 0x0a, - 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, - 0x3d, 0x0a, 0x07, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x17, 0x2e, 0x63, 0x61, 0x73, - 0x63, 0x61, 0x64, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x53, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x12, 0x54, - 0x0a, 0x0f, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, - 0x61, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x55, 0x70, 0x6c, 0x6f, - 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x55, 0x70, 0x6c, - 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, + 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x48, 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x2f, + 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x11, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, + 0x0e, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x22, + 0x1f, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x98, 0x01, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, + 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x66, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x68, + 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x48, + 0x61, 0x73, 0x68, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x71, 0x5f, 0x6d, 0x61, 0x78, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x71, 0x4d, 0x61, 0x78, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x69, + 0x67, 0x6e, 0x65, 0x64, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x22, 0x4d, 0x0a, 0x17, 0x55, + 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, + 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x68, 0x0a, 0x0e, 0x43, 0x61, + 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x56, 0x0a, 0x0f, + 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x12, + 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, + 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x55, 0x70, 0x6c, 0x6f, 0x61, + 0x64, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x28, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -}) +} var ( file_proto_supernode_action_cascade_service_proto_rawDescOnce sync.Once - file_proto_supernode_action_cascade_service_proto_rawDescData []byte + file_proto_supernode_action_cascade_service_proto_rawDescData = file_proto_supernode_action_cascade_service_proto_rawDesc ) func file_proto_supernode_action_cascade_service_proto_rawDescGZIP() []byte { file_proto_supernode_action_cascade_service_proto_rawDescOnce.Do(func() { - file_proto_supernode_action_cascade_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_supernode_action_cascade_service_proto_rawDesc), len(file_proto_supernode_action_cascade_service_proto_rawDesc))) + file_proto_supernode_action_cascade_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_supernode_action_cascade_service_proto_rawDescData) }) return file_proto_supernode_action_cascade_service_proto_rawDescData } @@ -306,20 +334,20 @@ func file_proto_supernode_action_cascade_service_proto_rawDescGZIP() []byte { var file_proto_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_proto_supernode_action_cascade_service_proto_goTypes = []any{ (*UploadInputDataRequest)(nil), // 0: cascade.UploadInputDataRequest - (*UploadInputDataResponse)(nil), // 1: cascade.UploadInputDataResponse - (*SessionRequest)(nil), // 2: cascade.SessionRequest - (*SessionReply)(nil), // 3: cascade.SessionReply + (*DataChunk)(nil), // 1: cascade.DataChunk + (*Metadata)(nil), // 2: cascade.Metadata + (*UploadInputDataResponse)(nil), // 3: cascade.UploadInputDataResponse } var file_proto_supernode_action_cascade_service_proto_depIdxs = []int32{ - 2, // 0: cascade.CascadeService.Session:input_type -> cascade.SessionRequest - 0, // 1: cascade.CascadeService.UploadInputData:input_type -> cascade.UploadInputDataRequest - 3, // 2: cascade.CascadeService.Session:output_type -> cascade.SessionReply - 1, // 3: cascade.CascadeService.UploadInputData:output_type -> cascade.UploadInputDataResponse - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 1, // 0: cascade.UploadInputDataRequest.chunk:type_name -> cascade.DataChunk + 2, // 1: cascade.UploadInputDataRequest.metadata:type_name -> cascade.Metadata + 0, // 2: cascade.CascadeService.UploadInputData:input_type -> cascade.UploadInputDataRequest + 3, // 3: cascade.CascadeService.UploadInputData:output_type -> cascade.UploadInputDataResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_proto_supernode_action_cascade_service_proto_init() } @@ -327,11 +355,15 @@ func file_proto_supernode_action_cascade_service_proto_init() { if File_proto_supernode_action_cascade_service_proto != nil { return } + file_proto_supernode_action_cascade_service_proto_msgTypes[0].OneofWrappers = []any{ + (*UploadInputDataRequest_Chunk)(nil), + (*UploadInputDataRequest_Metadata)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_supernode_action_cascade_service_proto_rawDesc), len(file_proto_supernode_action_cascade_service_proto_rawDesc)), + RawDescriptor: file_proto_supernode_action_cascade_service_proto_rawDesc, NumEnums: 0, NumMessages: 4, NumExtensions: 0, @@ -342,6 +374,7 @@ func file_proto_supernode_action_cascade_service_proto_init() { MessageInfos: file_proto_supernode_action_cascade_service_proto_msgTypes, }.Build() File_proto_supernode_action_cascade_service_proto = out.File + file_proto_supernode_action_cascade_service_proto_rawDesc = nil file_proto_supernode_action_cascade_service_proto_goTypes = nil file_proto_supernode_action_cascade_service_proto_depIdxs = nil } diff --git a/gen/supernode/action/cascade/service_grpc.pb.go b/gen/supernode/action/cascade/service_grpc.pb.go index fc9bf998..db6f86fd 100644 --- a/gen/supernode/action/cascade/service_grpc.pb.go +++ b/gen/supernode/action/cascade/service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v5.29.3 +// - protoc v3.21.12 // source: proto/supernode/action/cascade/service.proto package cascade @@ -19,7 +19,6 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - CascadeService_Session_FullMethodName = "/cascade.CascadeService/Session" CascadeService_UploadInputData_FullMethodName = "/cascade.CascadeService/UploadInputData" ) @@ -27,8 +26,7 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CascadeServiceClient interface { - Session(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SessionRequest, SessionReply], error) - UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error) + UploadInputData(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[UploadInputDataRequest, UploadInputDataResponse], error) } type cascadeServiceClient struct { @@ -39,35 +37,24 @@ func NewCascadeServiceClient(cc grpc.ClientConnInterface) CascadeServiceClient { return &cascadeServiceClient{cc} } -func (c *cascadeServiceClient) Session(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SessionRequest, SessionReply], error) { +func (c *cascadeServiceClient) UploadInputData(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[UploadInputDataRequest, UploadInputDataResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &CascadeService_ServiceDesc.Streams[0], CascadeService_Session_FullMethodName, cOpts...) + stream, err := c.cc.NewStream(ctx, &CascadeService_ServiceDesc.Streams[0], CascadeService_UploadInputData_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &grpc.GenericClientStream[SessionRequest, SessionReply]{ClientStream: stream} + x := &grpc.GenericClientStream[UploadInputDataRequest, UploadInputDataResponse]{ClientStream: stream} return x, nil } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type CascadeService_SessionClient = grpc.BidiStreamingClient[SessionRequest, SessionReply] - -func (c *cascadeServiceClient) UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(UploadInputDataResponse) - err := c.cc.Invoke(ctx, CascadeService_UploadInputData_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} +type CascadeService_UploadInputDataClient = grpc.ClientStreamingClient[UploadInputDataRequest, UploadInputDataResponse] // CascadeServiceServer is the server API for CascadeService service. // All implementations must embed UnimplementedCascadeServiceServer // for forward compatibility. type CascadeServiceServer interface { - Session(grpc.BidiStreamingServer[SessionRequest, SessionReply]) error - UploadInputData(context.Context, *UploadInputDataRequest) (*UploadInputDataResponse, error) + UploadInputData(grpc.ClientStreamingServer[UploadInputDataRequest, UploadInputDataResponse]) error mustEmbedUnimplementedCascadeServiceServer() } @@ -78,11 +65,8 @@ type CascadeServiceServer interface { // pointer dereference when methods are called. type UnimplementedCascadeServiceServer struct{} -func (UnimplementedCascadeServiceServer) Session(grpc.BidiStreamingServer[SessionRequest, SessionReply]) error { - return status.Errorf(codes.Unimplemented, "method Session not implemented") -} -func (UnimplementedCascadeServiceServer) UploadInputData(context.Context, *UploadInputDataRequest) (*UploadInputDataResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method UploadInputData not implemented") +func (UnimplementedCascadeServiceServer) UploadInputData(grpc.ClientStreamingServer[UploadInputDataRequest, UploadInputDataResponse]) error { + return status.Errorf(codes.Unimplemented, "method UploadInputData not implemented") } func (UnimplementedCascadeServiceServer) mustEmbedUnimplementedCascadeServiceServer() {} func (UnimplementedCascadeServiceServer) testEmbeddedByValue() {} @@ -105,30 +89,12 @@ func RegisterCascadeServiceServer(s grpc.ServiceRegistrar, srv CascadeServiceSer s.RegisterService(&CascadeService_ServiceDesc, srv) } -func _CascadeService_Session_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(CascadeServiceServer).Session(&grpc.GenericServerStream[SessionRequest, SessionReply]{ServerStream: stream}) +func _CascadeService_UploadInputData_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CascadeServiceServer).UploadInputData(&grpc.GenericServerStream[UploadInputDataRequest, UploadInputDataResponse]{ServerStream: stream}) } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type CascadeService_SessionServer = grpc.BidiStreamingServer[SessionRequest, SessionReply] - -func _CascadeService_UploadInputData_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(UploadInputDataRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(CascadeServiceServer).UploadInputData(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: CascadeService_UploadInputData_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(CascadeServiceServer).UploadInputData(ctx, req.(*UploadInputDataRequest)) - } - return interceptor(ctx, in, info, handler) -} +type CascadeService_UploadInputDataServer = grpc.ClientStreamingServer[UploadInputDataRequest, UploadInputDataResponse] // CascadeService_ServiceDesc is the grpc.ServiceDesc for CascadeService service. // It's only intended for direct use with grpc.RegisterService, @@ -136,17 +102,11 @@ func _CascadeService_UploadInputData_Handler(srv interface{}, ctx context.Contex var CascadeService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "cascade.CascadeService", HandlerType: (*CascadeServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "UploadInputData", - Handler: _CascadeService_UploadInputData_Handler, - }, - }, + Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { - StreamName: "Session", - Handler: _CascadeService_Session_Handler, - ServerStreams: true, + StreamName: "UploadInputData", + Handler: _CascadeService_UploadInputData_Handler, ClientStreams: true, }, }, diff --git a/proto/supernode/action/cascade/service.proto b/proto/supernode/action/cascade/service.proto index d204180d..3449fa56 100644 --- a/proto/supernode/action/cascade/service.proto +++ b/proto/supernode/action/cascade/service.proto @@ -1,32 +1,31 @@ syntax = "proto3"; - package cascade; - option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"; -service CascadeService { - rpc Session(stream SessionRequest) returns (stream SessionReply); - rpc UploadInputData (UploadInputDataRequest) returns (UploadInputDataResponse); +service CascadeService { + rpc UploadInputData (stream UploadInputDataRequest) returns (UploadInputDataResponse); } message UploadInputDataRequest { - string filename = 1; - string action_id = 2; - string data_hash = 3; - int32 rq_max = 4; - string signed_data = 5; - bytes data = 6; + oneof request_type { + DataChunk chunk = 1; + Metadata metadata = 2; + } } -message UploadInputDataResponse { - bool success = 1; - string message = 2; +message DataChunk { + bytes data = 1; } -message SessionRequest { - bool is_primary = 1; +message Metadata { + string filename = 1; + string action_id = 2; + string data_hash = 3; + int32 rq_max = 4; + string signed_data = 5; } -message SessionReply { - string sessID = 1; -} +message UploadInputDataResponse { + bool success = 1; + string message = 2; +} \ No newline at end of file diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 7be4f29e..9e852e37 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -17,8 +17,9 @@ import ( "github.com/LumeraProtocol/supernode/pkg/raptorq" "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/supernode/config" + "github.com/LumeraProtocol/supernode/supernode/node/action/server/cascade" "github.com/LumeraProtocol/supernode/supernode/node/supernode/server" - "github.com/LumeraProtocol/supernode/supernode/services/cascade" + cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" "github.com/LumeraProtocol/supernode/supernode/services/common" cKeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" @@ -95,35 +96,41 @@ The supernode will connect to the Lumera network and begin participating in the } // Configure cascade service - cascadeService := cascade.NewCascadeService( - &cascade.Config{ + cService := cascadeService.NewCascadeService( + &cascadeService.Config{ Config: common.Config{ SupernodeAccountAddress: appConfig.SupernodeConfig.KeyName, }, - RaptorQServicePort: fmt.Sprintf("%d", appConfig.RaptorQConfig.ServicePort), RaptorQServiceAddress: appConfig.RaptorQConfig.ServiceAddress, RqFilesDir: appConfig.RaptorQConfig.FilesDir, - NumberConnectedNodes: 1, }, lumeraClient, - nil, *p2pService, raptorQClientConnection.RaptorQ(raptorq.NewConfig(), lumeraClient, rqStore), raptorq.NewClient(), rqStore, ) + // Create cascade action server + cascadeActionServer := cascade.NewCascadeActionServer(cService) + + // Configure server serverConfig := &server.Config{ - ListenAddresses: appConfig.SupernodeConfig.IpAddress, // FIXME : confirm - Port: int(appConfig.SupernodeConfig.Port), // FIXME : confirm + + Identity: appConfig.SupernodeConfig.Identity, + ListenAddresses: appConfig.SupernodeConfig.IpAddress, + Port: int(appConfig.SupernodeConfig.Port), } - grpc := server.New(serverConfig, + + // Create gRPC server + grpcServer, err := server.New(serverConfig, "service", - cascadeService, + kr, + cascadeActionServer, ) // Start the services - RunServices(ctx, grpc, cascadeService, *p2pService) + RunServices(ctx, grpcServer, cService, *p2pService) // Set up signal handling for graceful shutdown sigCh := make(chan os.Signal, 1) diff --git a/supernode/config.yml b/supernode/config.yml index 802200a5..c187848b 100644 --- a/supernode/config.yml +++ b/supernode/config.yml @@ -1,6 +1,7 @@ # Supernode Configuration supernode: key_name: "mukey" # Account name for the supernode in keyring + identity: "lumera1uarju67x0hzetfzhgktay3h25pgxdy7yxap2xk" # Identity of the supernode, lumera address ip_address: "0.0.0.0" port: 4444 data_dir: "~/.supernode" # Base directory in home folder @@ -27,6 +28,5 @@ lumera: # RaptorQ Configuration raptorq: - service_address: "0.0.0.0" - service_port: 1234 + service_address: "localhost:50051" files_dir: "~/.supernode/raptorq_files" diff --git a/supernode/config/config.go b/supernode/config/config.go index c34eb1b6..6f59e9d2 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -11,6 +11,7 @@ import ( ) type SupernodeConfig struct { + Identity string `yaml:"identity"` KeyName string `yaml:"key_name"` IpAddress string `yaml:"ip_address"` Port uint16 `yaml:"port"` @@ -39,7 +40,6 @@ type LumeraClientConfig struct { type RaptorQConfig struct { ServiceAddress string `yaml:"service_address"` - ServicePort uint16 `yaml:"service_port"` FilesDir string `yaml:"files_dir"` } diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 751cec60..3fdedd08 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -1,20 +1,102 @@ package cascade import ( - cascadeGen "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" - "github.com/LumeraProtocol/supernode/supernode/node/common" - "github.com/LumeraProtocol/supernode/supernode/services/cascade" + "fmt" + "io" + + pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" + "google.golang.org/grpc" ) type CascadeActionServer struct { - cascadeGen.UnimplementedCascadeServiceServer - - *common.RegisterCascade + pb.UnimplementedCascadeServiceServer + service *cascadeService.CascadeService } -// NewCascadeActionServer returns a new CascadeActionServer instance. -func NewCascadeActionServer(service *cascade.CascadeService) *CascadeActionServer { +func NewCascadeActionServer(service *cascadeService.CascadeService) *CascadeActionServer { return &CascadeActionServer{ - RegisterCascade: common.NewRegisterCascade(service), + service: service, + } +} + +func (server *CascadeActionServer) Desc() *grpc.ServiceDesc { + return &pb.CascadeService_ServiceDesc +} +func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_UploadInputDataServer) error { + fields := logtrace.Fields{ + logtrace.FieldMethod: "UploadInputData", + logtrace.FieldModule: "CascadeActionServer", + } + + ctx := stream.Context() + logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields) + + // Collect data chunks + var allData []byte + var metadata *pb.Metadata + + // Process incoming stream + for { + req, err := stream.Recv() + if err == io.EOF { + // End of stream + break + } + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "error receiving stream data", fields) + return fmt.Errorf("failed to receive stream data: %w", err) + } + + // Check which type of message we received + switch x := req.RequestType.(type) { + case *pb.UploadInputDataRequest_Chunk: + // Add data chunk to our collection + allData = append(allData, x.Chunk.Data...) + logtrace.Info(ctx, "received data chunk", logtrace.Fields{ + "chunk_size": len(x.Chunk.Data), + "total_size_so_far": len(allData), + }) + + case *pb.UploadInputDataRequest_Metadata: + // Store metadata - this should be the final message + metadata = x.Metadata + logtrace.Info(ctx, "received metadata", logtrace.Fields{ + "filename": metadata.Filename, + "action_id": metadata.ActionId, + "data_hash": metadata.DataHash, + }) + } + } + + // Verify we received metadata + if metadata == nil { + logtrace.Error(ctx, "no metadata received in stream", fields) + return fmt.Errorf("no metadata received") } + + // Process the complete data + task := server.service.NewCascadeRegistrationTask() + res, err := task.UploadInputData(ctx, &cascadeService.UploadInputDataRequest{ + Filename: metadata.Filename, + ActionID: metadata.ActionId, + DataHash: metadata.DataHash, + RqMax: metadata.RqMax, + SignedData: metadata.SignedData, + Data: allData, + }) + + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to upload input data", fields) + return fmt.Errorf("cascade services upload input data error: %w", err) + } + + // Send the response + return stream.SendMsg(&pb.UploadInputDataResponse{ + Success: res.Success, + Message: res.Message, + }) } diff --git a/supernode/node/action/server/cascade/session.go b/supernode/node/action/server/cascade/session.go deleted file mode 100644 index 496913bb..00000000 --- a/supernode/node/action/server/cascade/session.go +++ /dev/null @@ -1,73 +0,0 @@ -package cascade - -import ( - "context" - "io" - - pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" - "github.com/LumeraProtocol/supernode/supernode/services/cascade" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" -) - -// Session implements CascadeActionServer RegisterCascadeServer.Session() -func (service *CascadeActionServer) Session(stream pb.CascadeService_SessionServer) error { - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() - - var task *cascade.CascadeRegistrationTask - - if sessID, ok := service.SessID(ctx); ok { - if task = service.Task(sessID); task == nil { - return errors.Errorf("not found %q task", sessID) - } - } else { - task = service.NewCascadeRegistrationTask() - } - go func() { - <-task.Done() - cancel() - }() - defer task.Cancel() - - peer, _ := peer.FromContext(ctx) - - defer log.WithContext(ctx).WithField("addr", peer.Addr).Debug("Session stream closed") - - req, err := stream.Recv() - if err != nil { - return errors.Errorf("receieve handshake request: %w", err) - } - - if err := task.NetworkHandler.Session(ctx, req.IsPrimary); err != nil { - return err - } - - resp := &pb.SessionReply{ - SessID: task.ID(), - } - - if err := stream.Send(resp); err != nil { - return errors.Errorf("send handshake response: %w", err) - } - log.WithContext(ctx).WithField("resp", resp).Debug("Session response") - - for { - if _, err := stream.Recv(); err != nil { - if err == io.EOF { - return nil - } - switch status.Code(err) { - case codes.Canceled: - log.WithContext(ctx).WithError(err).Error("handshake stream canceled") - return nil - case codes.Unavailable: - return nil - } - return errors.Errorf("handshake stream closed: %w", err) - } - } -} diff --git a/supernode/node/action/server/cascade/upload_cascade_action_input.go b/supernode/node/action/server/cascade/upload_cascade_action_input.go deleted file mode 100644 index 67a81e1e..00000000 --- a/supernode/node/action/server/cascade/upload_cascade_action_input.go +++ /dev/null @@ -1,40 +0,0 @@ -package cascade - -import ( - "context" - "fmt" - - pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" - "github.com/LumeraProtocol/supernode/pkg/logtrace" - cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade" -) - -func (s *CascadeActionServer) UploadInputData(ctx context.Context, req *pb.UploadInputDataRequest) (*pb.UploadInputDataResponse, error) { - fields := logtrace.Fields{ - logtrace.FieldMethod: "UploadInputData", - logtrace.FieldModule: "CascadeActionServer", - logtrace.FieldRequest: req, - } - logtrace.Info(ctx, "request to upload cascade input data received", fields) - - task, err := s.TaskFromMD(ctx) - if err != nil { - return nil, err - } - - res, err := task.UploadInputData(ctx, &cascadeService.UploadInputDataRequest{ - Filename: req.Filename, - ActionID: req.ActionId, - DataHash: req.DataHash, - RqMax: req.RqMax, - SignedData: req.SignedData, - Data: req.Data, - }) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to upload input data", fields) - return &pb.UploadInputDataResponse{}, fmt.Errorf("cascade services upload input data error: %w", err) - } - - return &pb.UploadInputDataResponse{Success: res.Success, Message: res.Message}, nil -} diff --git a/supernode/node/common/register_cascade.go b/supernode/node/common/register_cascade.go deleted file mode 100644 index 9f4ef70f..00000000 --- a/supernode/node/common/register_cascade.go +++ /dev/null @@ -1,51 +0,0 @@ -package common - -import ( - "context" - - "google.golang.org/grpc/metadata" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/proto" - "github.com/LumeraProtocol/supernode/supernode/services/cascade" -) - -// RegisterCascade represents common grpc services for registration sense. -type RegisterCascade struct { - *cascade.CascadeService -} - -// SessID retrieves SessID from the metadata. -func (service *RegisterCascade) SessID(ctx context.Context) (string, bool) { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "", false - } - - mdVals := md.Get(proto.MetadataKeySessID) - if len(mdVals) == 0 { - return "", false - } - return mdVals[0], true -} - -// TaskFromMD returns task by SessID from the metadata. -func (service *RegisterCascade) TaskFromMD(ctx context.Context) (*cascade.CascadeRegistrationTask, error) { - sessID, ok := service.SessID(ctx) - if !ok { - return nil, errors.New("not found sessID in metadata") - } - - task := service.Task(sessID) - if task == nil { - return nil, errors.Errorf("not found %q task", sessID) - } - return task, nil -} - -// NewRegisterCascade returns a new RegisterSense instance. -func NewRegisterCascade(service *cascade.CascadeService) *RegisterCascade { - return &RegisterCascade{ - CascadeService: service, - } -} diff --git a/supernode/node/supernode/server/common/register_cascade.go b/supernode/node/supernode/server/common/register_cascade.go deleted file mode 100644 index 08cdc0ad..00000000 --- a/supernode/node/supernode/server/common/register_cascade.go +++ /dev/null @@ -1,56 +0,0 @@ -package common - -import ( - "context" - - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/proto" - "github.com/LumeraProtocol/supernode/supernode/services/cascade" -) - -// RegisterCascade represents common grpc services for registration sense. -type RegisterCascade struct { - *cascade.CascadeService -} - -// SessID retrieves SessID from the metadata. -func (service *RegisterCascade) SessID(ctx context.Context) (string, bool) { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "", false - } - - mdVals := md.Get(proto.MetadataKeySessID) - if len(mdVals) == 0 { - return "", false - } - return mdVals[0], true -} - -// TaskFromMD returns task by SessID from the metadata. -func (service *RegisterCascade) TaskFromMD(ctx context.Context) (*cascade.CascadeRegistrationTask, error) { - sessID, ok := service.SessID(ctx) - if !ok { - return nil, errors.New("not found sessID in metadata") - } - - task := service.Task(sessID) - if task == nil { - return nil, errors.Errorf("not found %q task", sessID) - } - return task, nil -} - -func (service *RegisterCascade) Desc() *grpc.ServiceDesc { - return &grpc.ServiceDesc{ServiceName: "supernode.RegisterCascade", HandlerType: (*RegisterCascade)(nil)} -} - -// NewRegisterCascade returns a new RegisterSense instance. -func NewRegisterCascade(service *cascade.CascadeService) *RegisterCascade { - return &RegisterCascade{ - CascadeService: service, - } -} diff --git a/supernode/node/supernode/server/config.go b/supernode/node/supernode/server/config.go index 722857e0..00728dd2 100644 --- a/supernode/node/supernode/server/config.go +++ b/supernode/node/supernode/server/config.go @@ -7,8 +7,9 @@ const ( // Config contains settings of the supernode server. type Config struct { - ListenAddresses string `mapstructure:"listen_addresses" json:"listen_addresses,omitempty"` - Port int `mapstructure:"port" json:"port,omitempty"` + Identity string + ListenAddresses string + Port int } // NewConfig returns a new Config instance. diff --git a/supernode/node/supernode/server/server.go b/supernode/node/supernode/server/server.go index 4ea26161..8504ae17 100644 --- a/supernode/node/supernode/server/server.go +++ b/supernode/node/supernode/server/server.go @@ -9,66 +9,75 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + // "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" // "github.com/LumeraProtocol/supernode/pkg/errgroup" "github.com/LumeraProtocol/supernode/pkg/errors" "github.com/LumeraProtocol/supernode/pkg/log" + + // ltc "github.com/LumeraProtocol/supernode/pkg/net/credentials" + // "github.com/LumeraProtocol/supernode/pkg/net/credentials/alts/conn" + "github.com/cosmos/cosmos-sdk/crypto/keyring" ) +// NOTE : Implemented with insecure gRPC server for now, +// // secure gRPC server setup is commented out and will be updated later + type service interface { Desc() *grpc.ServiceDesc } // Server represents supernode server type Server struct { - config *Config - services []service - name string - //secClient alts.SecClient - //secInfo *alts.SecInfo + config *Config + services []service + name string + kr keyring.Keyring + grpcServer *grpc.Server + healthServer *health.Server } // Run starts the server func (server *Server) Run(ctx context.Context) error { + // conn.RegisterALTSRecordProtocols() + // defer conn.UnregisterALTSRecordProtocols() grpclog.SetLoggerV2(log.NewLoggerWithErrorLevel()) + // log.WithContext(ctx).Infof("Server identity: %s", server.config.Identity) + log.WithContext(ctx).Infof("Listening on: %s", server.config.ListenAddresses) ctx = log.ContextWithPrefix(ctx, server.name) group, ctx := errgroup.WithContext(ctx) addresses := strings.Split(server.config.ListenAddresses, ",") - grpcServer := server.grpcServer(ctx) - if grpcServer == nil { - return fmt.Errorf("initialize grpc server failed") + if err := server.setupGRPCServer(); err != nil { + return fmt.Errorf("failed to setup gRPC server: %w", err) } for _, address := range addresses { addr := net.JoinHostPort(strings.TrimSpace(address), strconv.Itoa(server.config.Port)) + address := addr // Create a new variable to avoid closure issues group.Go(func() error { - return server.listen(ctx, addr, grpcServer) + return server.listen(ctx, address) }) } return group.Wait() } -func (server *Server) listen(ctx context.Context, address string, grpcServer *grpc.Server) (err error) { +func (server *Server) listen(ctx context.Context, address string) (err error) { listen, err := net.Listen("tcp", address) if err != nil { return errors.Errorf("listen: %w", err).WithField("address", address) } - // The listener that will track connections. - /*listen = &connTrackListener{ - Listener: listen, - connTrack: server.connTrack, // connection tracker - }*/ - errCh := make(chan error, 1) go func() { defer errors.Recover(func(recErr error) { err = recErr }) - log.WithContext(ctx).Infof("gRPC server listening on %q", address) - if err := grpcServer.Serve(listen); err != nil { + log.WithContext(ctx).Infof("gRPC server listening insecurely on %q", address) + if err := server.grpcServer.Serve(listen); err != nil { errCh <- errors.Errorf("serve: %w", err).WithField("address", address) } }() @@ -76,7 +85,7 @@ func (server *Server) listen(ctx context.Context, address string, grpcServer *gr select { case <-ctx.Done(): log.WithContext(ctx).Infof("Shutting down gRPC server at %q", address) - grpcServer.GracefulStop() + server.grpcServer.GracefulStop() case err := <-errCh: return err } @@ -84,58 +93,78 @@ func (server *Server) listen(ctx context.Context, address string, grpcServer *gr return nil } -func (server *Server) grpcServer(ctx context.Context) *grpc.Server { - //if server.secClient == nil || server.secInfo == nil { - // log.WithContext(ctx).Errorln("secClient or secInfo don't initialize") - // return nil - //} - - //// Define the keep-alive parameters - //kaParams := keepalive.ServerParameters{ - // MaxConnectionIdle: 2 * time.Hour, - // MaxConnectionAge: 2 * time.Hour, - // MaxConnectionAgeGrace: 1 * time.Hour, - // Time: 1 * time.Hour, - // Timeout: 30 * time.Minute, - //} - // - //// Define the keep-alive enforcement policy - //kaPolicy := keepalive.EnforcementPolicy{ - // MinTime: 3 * time.Minute, // Minimum time a client should wait before sending keep-alive probes - // PermitWithoutStream: true, // Only allow pings when there are active streams - //} - - var grpcServer *grpc.Server - //if os.Getenv("INTEGRATION_TEST_ENV") == "true" { - // grpcServer = grpc.NewServer(middleware.UnaryInterceptor(), middleware.StreamInterceptor(), grpc.MaxSendMsgSize(100000000), - // grpc.MaxRecvMsgSize(100000000), grpc.KeepaliveParams(kaParams), // Use the keep-alive parameters - // grpc.KeepaliveEnforcementPolicy(kaPolicy)) - //} else { - // - // grpcServer = grpc.NewServer(middleware.UnaryInterceptor(), middleware.StreamInterceptor(), - // middleware.AltsCredential(server.secClient, server.secInfo), grpc.MaxSendMsgSize(100000000), - // grpc.MaxRecvMsgSize(100000000), grpc.KeepaliveParams(kaParams), // Use the keep-alive parameters - // grpc.KeepaliveEnforcementPolicy(kaPolicy)) - //} +func (server *Server) setupGRPCServer() error { + // --- Commented out secure credential setup --- + /* + // Create server credentials + serverCreds, err := ltc.NewServerCreds(