From b2287d6d7e763d3d00fe6e6655955c989d520918 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Wed, 11 Jun 2025 12:50:35 +0500 Subject: [PATCH] implement supernode processing for download --- gen/supernode/action/cascade/service.pb.go | 393 ++++++++++++++---- .../action/cascade/service_grpc.pb.go | 41 ++ pkg/codec/codec.go | 1 + pkg/codec/codec_mock.go | 23 +- pkg/codec/decode.go | 65 +++ pkg/logtrace/fields.go | 1 + pkg/utils/utils.go | 15 + proto/supernode/action/cascade/service.proto | 19 +- .../server/cascade/cascade_action_server.go | 74 ++++ .../cascade/adaptors/mocks/rq_mock.go | 23 +- supernode/services/cascade/adaptors/rq.go | 29 ++ supernode/services/cascade/download.go | 186 +++++++++ supernode/services/cascade/events.go | 1 + supernode/services/cascade/helper.go | 28 ++ supernode/services/cascade/interfaces.go | 1 + .../cascade/mocks/cascade_interfaces_mock.go | 14 + 16 files changed, 810 insertions(+), 104 deletions(-) create mode 100644 pkg/codec/decode.go create mode 100644 supernode/services/cascade/download.go diff --git a/gen/supernode/action/cascade/service.pb.go b/gen/supernode/action/cascade/service.pb.go index 534e5272..64207cd9 100644 --- a/gen/supernode/action/cascade/service.pb.go +++ b/gen/supernode/action/cascade/service.pb.go @@ -35,6 +35,7 @@ const ( SupernodeEventType_RQID_VERIFIED SupernodeEventType = 9 SupernodeEventType_ARTEFACTS_STORED SupernodeEventType = 10 SupernodeEventType_ACTION_FINALIZED SupernodeEventType = 11 + SupernodeEventType_Artefacts_Downloaded SupernodeEventType = 12 ) // Enum value maps for SupernodeEventType. @@ -52,6 +53,7 @@ var ( 9: "RQID_VERIFIED", 10: "ARTEFACTS_STORED", 11: "ACTION_FINALIZED", + 12: "Artefacts_Downloaded", } SupernodeEventType_value = map[string]int32{ "UNKNOWN": 0, @@ -66,6 +68,7 @@ var ( "RQID_VERIFIED": 9, "ARTEFACTS_STORED": 10, "ACTION_FINALIZED": 11, + "Artefacts_Downloaded": 12, } ) @@ -334,6 +337,183 @@ func (x *RegisterResponse) GetTxHash() string { return "" } +type DownloadRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ActionId string `protobuf:"bytes,1,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` +} + +func (x *DownloadRequest) Reset() { + *x = DownloadRequest{} + mi := &file_supernode_action_cascade_service_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DownloadRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DownloadRequest) ProtoMessage() {} + +func (x *DownloadRequest) ProtoReflect() protoreflect.Message { + mi := &file_supernode_action_cascade_service_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DownloadRequest.ProtoReflect.Descriptor instead. +func (*DownloadRequest) Descriptor() ([]byte, []int) { + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{4} +} + +func (x *DownloadRequest) GetActionId() string { + if x != nil { + return x.ActionId + } + return "" +} + +type DownloadResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to ResponseType: + // + // *DownloadResponse_Event + // *DownloadResponse_Chunk + ResponseType isDownloadResponse_ResponseType `protobuf_oneof:"response_type"` +} + +func (x *DownloadResponse) Reset() { + *x = DownloadResponse{} + mi := &file_supernode_action_cascade_service_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DownloadResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DownloadResponse) ProtoMessage() {} + +func (x *DownloadResponse) ProtoReflect() protoreflect.Message { + mi := &file_supernode_action_cascade_service_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DownloadResponse.ProtoReflect.Descriptor instead. +func (*DownloadResponse) Descriptor() ([]byte, []int) { + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{5} +} + +func (m *DownloadResponse) GetResponseType() isDownloadResponse_ResponseType { + if m != nil { + return m.ResponseType + } + return nil +} + +func (x *DownloadResponse) GetEvent() *DownloadEvent { + if x, ok := x.GetResponseType().(*DownloadResponse_Event); ok { + return x.Event + } + return nil +} + +func (x *DownloadResponse) GetChunk() *DataChunk { + if x, ok := x.GetResponseType().(*DownloadResponse_Chunk); ok { + return x.Chunk + } + return nil +} + +type isDownloadResponse_ResponseType interface { + isDownloadResponse_ResponseType() +} + +type DownloadResponse_Event struct { + Event *DownloadEvent `protobuf:"bytes,1,opt,name=event,proto3,oneof"` +} + +type DownloadResponse_Chunk struct { + Chunk *DataChunk `protobuf:"bytes,2,opt,name=chunk,proto3,oneof"` +} + +func (*DownloadResponse_Event) isDownloadResponse_ResponseType() {} + +func (*DownloadResponse_Chunk) isDownloadResponse_ResponseType() {} + +type DownloadEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + EventType SupernodeEventType `protobuf:"varint,1,opt,name=event_type,json=eventType,proto3,enum=cascade.SupernodeEventType" json:"event_type,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *DownloadEvent) Reset() { + *x = DownloadEvent{} + mi := &file_supernode_action_cascade_service_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DownloadEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DownloadEvent) ProtoMessage() {} + +func (x *DownloadEvent) ProtoReflect() protoreflect.Message { + mi := &file_supernode_action_cascade_service_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DownloadEvent.ProtoReflect.Descriptor instead. +func (*DownloadEvent) Descriptor() ([]byte, []int) { + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{6} +} + +func (x *DownloadEvent) GetEventType() SupernodeEventType { + if x != nil { + return x.EventType + } + return SupernodeEventType_UNKNOWN +} + +func (x *DownloadEvent) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + type HealthCheckRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -342,7 +522,7 @@ type HealthCheckRequest struct { func (x *HealthCheckRequest) Reset() { *x = HealthCheckRequest{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[4] + mi := &file_supernode_action_cascade_service_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -354,7 +534,7 @@ func (x *HealthCheckRequest) String() string { func (*HealthCheckRequest) ProtoMessage() {} func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[4] + mi := &file_supernode_action_cascade_service_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -367,7 +547,7 @@ func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. func (*HealthCheckRequest) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{4} + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{7} } // The HealthCheckResponse represents system health status. @@ -383,7 +563,7 @@ type HealthCheckResponse struct { func (x *HealthCheckResponse) Reset() { *x = HealthCheckResponse{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[5] + mi := &file_supernode_action_cascade_service_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -395,7 +575,7 @@ func (x *HealthCheckResponse) String() string { func (*HealthCheckResponse) ProtoMessage() {} func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[5] + mi := &file_supernode_action_cascade_service_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -408,7 +588,7 @@ func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. func (*HealthCheckResponse) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{5} + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8} } func (x *HealthCheckResponse) GetCpu() *HealthCheckResponse_CPU { @@ -443,7 +623,7 @@ type HealthCheckResponse_CPU struct { func (x *HealthCheckResponse_CPU) Reset() { *x = HealthCheckResponse_CPU{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[6] + mi := &file_supernode_action_cascade_service_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -455,7 +635,7 @@ func (x *HealthCheckResponse_CPU) String() string { func (*HealthCheckResponse_CPU) ProtoMessage() {} func (x *HealthCheckResponse_CPU) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[6] + mi := &file_supernode_action_cascade_service_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -468,7 +648,7 @@ func (x *HealthCheckResponse_CPU) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheckResponse_CPU.ProtoReflect.Descriptor instead. func (*HealthCheckResponse_CPU) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{5, 0} + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8, 0} } func (x *HealthCheckResponse_CPU) GetUsage() string { @@ -498,7 +678,7 @@ type HealthCheckResponse_Memory struct { func (x *HealthCheckResponse_Memory) Reset() { *x = HealthCheckResponse_Memory{} - mi := &file_supernode_action_cascade_service_proto_msgTypes[7] + mi := &file_supernode_action_cascade_service_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -510,7 +690,7 @@ func (x *HealthCheckResponse_Memory) String() string { func (*HealthCheckResponse_Memory) ProtoMessage() {} func (x *HealthCheckResponse_Memory) ProtoReflect() protoreflect.Message { - mi := &file_supernode_action_cascade_service_proto_msgTypes[7] + mi := &file_supernode_action_cascade_service_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -523,7 +703,7 @@ func (x *HealthCheckResponse_Memory) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheckResponse_Memory.ProtoReflect.Descriptor instead. func (*HealthCheckResponse_Memory) Descriptor() ([]byte, []int) { - return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{5, 1} + return file_supernode_action_cascade_service_proto_rawDescGZIP(), []int{8, 1} } func (x *HealthCheckResponse_Memory) GetTotal() uint64 { @@ -582,63 +762,86 @@ var file_supernode_action_cascade_service_proto_rawDesc = []byte{ 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x22, 0x14, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xdc, - 0x02, 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x03, 0x63, 0x70, 0x75, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x2e, 0x43, 0x50, 0x55, 0x52, 0x03, 0x63, 0x70, 0x75, 0x12, 0x3b, 0x0a, 0x06, 0x6d, 0x65, - 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x61, 0x73, - 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x52, - 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, 0x2a, 0x0a, 0x11, 0x74, 0x61, 0x73, 0x6b, 0x73, - 0x5f, 0x69, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x03, - 0x28, 0x09, 0x52, 0x0f, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x49, 0x6e, 0x50, 0x72, 0x6f, 0x67, 0x72, - 0x65, 0x73, 0x73, 0x1a, 0x39, 0x0a, 0x03, 0x43, 0x50, 0x55, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73, - 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x1a, 0x6d, - 0x0a, 0x06, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x74, 0x61, - 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x12, - 0x0a, 0x04, 0x75, 0x73, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x75, 0x73, - 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, - 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65, 0x64, 0x5f, 0x70, 0x65, 0x72, 0x63, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x01, 0x52, 0x08, 0x75, 0x73, 0x65, 0x64, 0x50, 0x65, 0x72, 0x63, 0x2a, 0x9c, 0x02, - 0x0a, 0x12, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, - 0x00, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x54, 0x52, - 0x49, 0x45, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x43, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x46, 0x45, 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x02, - 0x12, 0x1e, 0x0a, 0x1a, 0x54, 0x4f, 0x50, 0x5f, 0x53, 0x55, 0x50, 0x45, 0x52, 0x4e, 0x4f, 0x44, - 0x45, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, 0x50, 0x41, 0x53, 0x53, 0x45, 0x44, 0x10, 0x03, - 0x12, 0x14, 0x0a, 0x10, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x44, 0x45, 0x43, - 0x4f, 0x44, 0x45, 0x44, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x48, - 0x41, 0x53, 0x48, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x05, 0x12, 0x11, - 0x0a, 0x0d, 0x49, 0x4e, 0x50, 0x55, 0x54, 0x5f, 0x45, 0x4e, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, - 0x06, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x5f, 0x56, - 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x07, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x51, 0x49, - 0x44, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x41, 0x54, 0x45, 0x44, 0x10, 0x08, 0x12, 0x11, 0x0a, - 0x0d, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x09, - 0x12, 0x14, 0x0a, 0x10, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x53, 0x54, - 0x4f, 0x52, 0x45, 0x44, 0x10, 0x0a, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x0b, 0x32, 0x9f, 0x01, 0x0a, - 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, - 0x43, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x63, 0x61, - 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, - 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x28, 0x01, 0x30, 0x01, 0x12, 0x48, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, - 0x65, 0x63, 0x6b, 0x12, 0x1b, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x42, - 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x75, 0x6d, - 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x73, 0x75, 0x70, 0x65, - 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, - 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x63, 0x61, 0x73, 0x63, 0x61, - 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2e, 0x0a, 0x0f, 0x44, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x7f, 0x0a, 0x10, 0x44, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x61, + 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x05, + 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x61, + 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x48, + 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x0f, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x22, 0x65, 0x0a, 0x0d, 0x44, 0x6f, 0x77, + 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3a, 0x0a, 0x0a, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, + 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, + 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x14, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xdc, 0x02, 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, + 0x0a, 0x03, 0x63, 0x70, 0x75, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x63, 0x61, + 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x50, 0x55, 0x52, 0x03, 0x63, + 0x70, 0x75, 0x12, 0x3b, 0x0a, 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x2e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x12, + 0x2a, 0x0a, 0x11, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x5f, 0x69, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x61, 0x73, 0x6b, + 0x73, 0x49, 0x6e, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x39, 0x0a, 0x03, 0x43, + 0x50, 0x55, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x61, + 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, + 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x1a, 0x6d, 0x0a, 0x06, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x75, 0x73, 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x76, + 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x61, + 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x75, 0x73, 0x65, 0x64, + 0x5f, 0x70, 0x65, 0x72, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x08, 0x75, 0x73, 0x65, + 0x64, 0x50, 0x65, 0x72, 0x63, 0x2a, 0xb6, 0x02, 0x0a, 0x12, 0x53, 0x75, 0x70, 0x65, 0x72, 0x6e, + 0x6f, 0x64, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, + 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, + 0x49, 0x4f, 0x4e, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x49, 0x45, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, + 0x17, 0x0a, 0x13, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x45, 0x45, 0x5f, 0x56, 0x45, + 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x54, 0x4f, 0x50, 0x5f, + 0x53, 0x55, 0x50, 0x45, 0x52, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x5f, + 0x50, 0x41, 0x53, 0x53, 0x45, 0x44, 0x10, 0x03, 0x12, 0x14, 0x0a, 0x10, 0x4d, 0x45, 0x54, 0x41, + 0x44, 0x41, 0x54, 0x41, 0x5f, 0x44, 0x45, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x04, 0x12, 0x16, + 0x0a, 0x12, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x48, 0x41, 0x53, 0x48, 0x5f, 0x56, 0x45, 0x52, 0x49, + 0x46, 0x49, 0x45, 0x44, 0x10, 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49, 0x4e, 0x50, 0x55, 0x54, 0x5f, + 0x45, 0x4e, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x06, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x49, 0x47, + 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x5f, 0x56, 0x45, 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, + 0x07, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x41, + 0x54, 0x45, 0x44, 0x10, 0x08, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x51, 0x49, 0x44, 0x5f, 0x56, 0x45, + 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x52, 0x54, 0x45, + 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x0a, 0x12, 0x14, + 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, + 0x45, 0x44, 0x10, 0x0b, 0x12, 0x18, 0x0a, 0x14, 0x41, 0x72, 0x74, 0x65, 0x66, 0x61, 0x63, 0x74, + 0x73, 0x5f, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x64, 0x10, 0x0c, 0x32, 0xe2, + 0x01, 0x0a, 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, + 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, + 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x48, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1b, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, + 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x48, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x41, 0x0a, 0x08, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x63, + 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, + 0x2e, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x30, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x4c, 0x75, 0x6d, 0x65, 0x72, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, + 0x2f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x73, + 0x75, 0x70, 0x65, 0x72, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, + 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -654,33 +857,41 @@ func file_supernode_action_cascade_service_proto_rawDescGZIP() []byte { } var file_supernode_action_cascade_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_supernode_action_cascade_service_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_supernode_action_cascade_service_proto_goTypes = []any{ (SupernodeEventType)(0), // 0: cascade.SupernodeEventType (*RegisterRequest)(nil), // 1: cascade.RegisterRequest (*DataChunk)(nil), // 2: cascade.DataChunk (*Metadata)(nil), // 3: cascade.Metadata (*RegisterResponse)(nil), // 4: cascade.RegisterResponse - (*HealthCheckRequest)(nil), // 5: cascade.HealthCheckRequest - (*HealthCheckResponse)(nil), // 6: cascade.HealthCheckResponse - (*HealthCheckResponse_CPU)(nil), // 7: cascade.HealthCheckResponse.CPU - (*HealthCheckResponse_Memory)(nil), // 8: cascade.HealthCheckResponse.Memory + (*DownloadRequest)(nil), // 5: cascade.DownloadRequest + (*DownloadResponse)(nil), // 6: cascade.DownloadResponse + (*DownloadEvent)(nil), // 7: cascade.DownloadEvent + (*HealthCheckRequest)(nil), // 8: cascade.HealthCheckRequest + (*HealthCheckResponse)(nil), // 9: cascade.HealthCheckResponse + (*HealthCheckResponse_CPU)(nil), // 10: cascade.HealthCheckResponse.CPU + (*HealthCheckResponse_Memory)(nil), // 11: cascade.HealthCheckResponse.Memory } var file_supernode_action_cascade_service_proto_depIdxs = []int32{ - 2, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk - 3, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata - 0, // 2: cascade.RegisterResponse.event_type:type_name -> cascade.SupernodeEventType - 7, // 3: cascade.HealthCheckResponse.cpu:type_name -> cascade.HealthCheckResponse.CPU - 8, // 4: cascade.HealthCheckResponse.memory:type_name -> cascade.HealthCheckResponse.Memory - 1, // 5: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest - 5, // 6: cascade.CascadeService.HealthCheck:input_type -> cascade.HealthCheckRequest - 4, // 7: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse - 6, // 8: cascade.CascadeService.HealthCheck:output_type -> cascade.HealthCheckResponse - 7, // [7:9] is the sub-list for method output_type - 5, // [5:7] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 2, // 0: cascade.RegisterRequest.chunk:type_name -> cascade.DataChunk + 3, // 1: cascade.RegisterRequest.metadata:type_name -> cascade.Metadata + 0, // 2: cascade.RegisterResponse.event_type:type_name -> cascade.SupernodeEventType + 7, // 3: cascade.DownloadResponse.event:type_name -> cascade.DownloadEvent + 2, // 4: cascade.DownloadResponse.chunk:type_name -> cascade.DataChunk + 0, // 5: cascade.DownloadEvent.event_type:type_name -> cascade.SupernodeEventType + 10, // 6: cascade.HealthCheckResponse.cpu:type_name -> cascade.HealthCheckResponse.CPU + 11, // 7: cascade.HealthCheckResponse.memory:type_name -> cascade.HealthCheckResponse.Memory + 1, // 8: cascade.CascadeService.Register:input_type -> cascade.RegisterRequest + 8, // 9: cascade.CascadeService.HealthCheck:input_type -> cascade.HealthCheckRequest + 5, // 10: cascade.CascadeService.Download:input_type -> cascade.DownloadRequest + 4, // 11: cascade.CascadeService.Register:output_type -> cascade.RegisterResponse + 9, // 12: cascade.CascadeService.HealthCheck:output_type -> cascade.HealthCheckResponse + 6, // 13: cascade.CascadeService.Download:output_type -> cascade.DownloadResponse + 11, // [11:14] is the sub-list for method output_type + 8, // [8:11] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_supernode_action_cascade_service_proto_init() } @@ -692,13 +903,17 @@ func file_supernode_action_cascade_service_proto_init() { (*RegisterRequest_Chunk)(nil), (*RegisterRequest_Metadata)(nil), } + file_supernode_action_cascade_service_proto_msgTypes[5].OneofWrappers = []any{ + (*DownloadResponse_Event)(nil), + (*DownloadResponse_Chunk)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_supernode_action_cascade_service_proto_rawDesc, NumEnums: 1, - NumMessages: 8, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/gen/supernode/action/cascade/service_grpc.pb.go b/gen/supernode/action/cascade/service_grpc.pb.go index c381031f..576cfea3 100644 --- a/gen/supernode/action/cascade/service_grpc.pb.go +++ b/gen/supernode/action/cascade/service_grpc.pb.go @@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( CascadeService_Register_FullMethodName = "/cascade.CascadeService/Register" CascadeService_HealthCheck_FullMethodName = "/cascade.CascadeService/HealthCheck" + CascadeService_Download_FullMethodName = "/cascade.CascadeService/Download" ) // CascadeServiceClient is the client API for CascadeService service. @@ -29,6 +30,7 @@ const ( type CascadeServiceClient interface { Register(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[RegisterRequest, RegisterResponse], error) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) + Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadResponse], error) } type cascadeServiceClient struct { @@ -62,12 +64,32 @@ func (c *cascadeServiceClient) HealthCheck(ctx context.Context, in *HealthCheckR return out, nil } +func (c *cascadeServiceClient) Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &CascadeService_ServiceDesc.Streams[1], CascadeService_Download_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[DownloadRequest, DownloadResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type CascadeService_DownloadClient = grpc.ServerStreamingClient[DownloadResponse] + // CascadeServiceServer is the server API for CascadeService service. // All implementations must embed UnimplementedCascadeServiceServer // for forward compatibility. type CascadeServiceServer interface { Register(grpc.BidiStreamingServer[RegisterRequest, RegisterResponse]) error HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) + Download(*DownloadRequest, grpc.ServerStreamingServer[DownloadResponse]) error mustEmbedUnimplementedCascadeServiceServer() } @@ -84,6 +106,9 @@ func (UnimplementedCascadeServiceServer) Register(grpc.BidiStreamingServer[Regis func (UnimplementedCascadeServiceServer) HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method HealthCheck not implemented") } +func (UnimplementedCascadeServiceServer) Download(*DownloadRequest, grpc.ServerStreamingServer[DownloadResponse]) error { + return status.Errorf(codes.Unimplemented, "method Download not implemented") +} func (UnimplementedCascadeServiceServer) mustEmbedUnimplementedCascadeServiceServer() {} func (UnimplementedCascadeServiceServer) testEmbeddedByValue() {} @@ -130,6 +155,17 @@ func _CascadeService_HealthCheck_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _CascadeService_Download_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(DownloadRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(CascadeServiceServer).Download(m, &grpc.GenericServerStream[DownloadRequest, DownloadResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type CascadeService_DownloadServer = grpc.ServerStreamingServer[DownloadResponse] + // CascadeService_ServiceDesc is the grpc.ServiceDesc for CascadeService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -149,6 +185,11 @@ var CascadeService_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "Download", + Handler: _CascadeService_Download_Handler, + ServerStreams: true, + }, }, Metadata: "supernode/action/cascade/service.proto", } diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index e644d3b5..39029569 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -37,4 +37,5 @@ type EncodeRequest struct { type Codec interface { // Encode a file Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) + Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) } diff --git a/pkg/codec/codec_mock.go b/pkg/codec/codec_mock.go index cb7ed36a..5d3caa6f 100644 --- a/pkg/codec/codec_mock.go +++ b/pkg/codec/codec_mock.go @@ -1,10 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. // Source: codec.go -// -// Generated by this command: -// -// mockgen -destination=codec_mock.go -package=codec -source=codec.go -// // Package codec is a generated GoMock package. package codec @@ -20,7 +15,6 @@ import ( type MockCodec struct { ctrl *gomock.Controller recorder *MockCodecMockRecorder - isgomock struct{} } // MockCodecMockRecorder is the mock recorder for MockCodec. @@ -40,6 +34,21 @@ func (m *MockCodec) EXPECT() *MockCodecMockRecorder { return m.recorder } +// Decode mocks base method. +func (m *MockCodec) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Decode", ctx, req) + ret0, _ := ret[0].(DecodeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Decode indicates an expected call of Decode. +func (mr *MockCodecMockRecorder) Decode(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockCodec)(nil).Decode), ctx, req) +} + // Encode mocks base method. func (m *MockCodec) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) { m.ctrl.T.Helper() @@ -50,7 +59,7 @@ func (m *MockCodec) Encode(ctx context.Context, req EncodeRequest) (EncodeRespon } // Encode indicates an expected call of Encode. -func (mr *MockCodecMockRecorder) Encode(ctx, req any) *gomock.Call { +func (mr *MockCodecMockRecorder) Encode(ctx, req interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Encode", reflect.TypeOf((*MockCodec)(nil).Encode), ctx, req) } diff --git a/pkg/codec/decode.go b/pkg/codec/decode.go new file mode 100644 index 00000000..7e36db8c --- /dev/null +++ b/pkg/codec/decode.go @@ -0,0 +1,65 @@ +package codec + +import ( + "context" + "fmt" + "os" + "path/filepath" + + raptorq "github.com/LumeraProtocol/rq-go" + "github.com/LumeraProtocol/supernode/pkg/logtrace" +) + +type DecodeRequest struct { + ActionID string + Layout Layout + Symbols map[string][]byte +} + +type DecodeResponse struct { + Path string + LayoutPath string +} + +func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { + fields := logtrace.Fields{ + logtrace.FieldMethod: "Decode", + logtrace.FieldModule: "rq", + logtrace.FieldActionID: req.ActionID, + } + logtrace.Info(ctx, "RaptorQ decode request received", fields) + + processor, err := raptorq.NewDefaultRaptorQProcessor() + if err != nil { + fields[logtrace.FieldError] = err.Error() + return DecodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err) + } + defer processor.Free() + + symbolsDir := filepath.Join(rq.symbolsBaseDir, req.ActionID) + if err := os.MkdirAll(symbolsDir, 0o755); err != nil { + fields[logtrace.FieldError] = err.Error() + return DecodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err) + } + + // Write symbols to disk + for id, data := range req.Symbols { + symbolPath := filepath.Join(symbolsDir, id) + if err := os.WriteFile(symbolPath, data, 0o644); err != nil { + fields[logtrace.FieldError] = err.Error() + return DecodeResponse{}, fmt.Errorf("write symbol %s: %w", id, err) + } + } + logtrace.Info(ctx, "symbols written to disk", fields) + + // Decode + outputPath := filepath.Join(symbolsDir, "output") + if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil { + fields[logtrace.FieldError] = err.Error() + _ = os.Remove(outputPath) + return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err) + } + + logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields) + return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil +} diff --git a/pkg/logtrace/fields.go b/pkg/logtrace/fields.go index d8a38b8a..8554137b 100644 --- a/pkg/logtrace/fields.go +++ b/pkg/logtrace/fields.go @@ -19,4 +19,5 @@ const ( FieldTaskID = "task_id" FieldActionID = "action_id" FieldHashHex = "hash_hex" + FieldActionState = "action_state" ) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a7f0c78d..f7056592 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -429,6 +429,21 @@ func ZstdCompress(data []byte) ([]byte, error) { return encoder.EncodeAll(data, nil), nil } +func ZstdDecompress(data []byte) ([]byte, error) { + decoder, err := zstd.NewReader(nil) + if err != nil { + return nil, fmt.Errorf("failed to create zstd decoder: %v", err) + } + defer decoder.Close() + + decoded, err := decoder.DecodeAll(data, nil) + if err != nil { + return nil, fmt.Errorf("failed to decompress zstd data: %v", err) + } + + return decoded, nil +} + // HighCompress compresses the data func HighCompress(cctx context.Context, data []byte) ([]byte, error) { ctx, cancel := context.WithTimeout(cctx, highCompressTimeout) diff --git a/proto/supernode/action/cascade/service.proto b/proto/supernode/action/cascade/service.proto index 0615e131..f4afd5be 100644 --- a/proto/supernode/action/cascade/service.proto +++ b/proto/supernode/action/cascade/service.proto @@ -5,7 +5,7 @@ option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/ca service CascadeService { rpc Register (stream RegisterRequest) returns (stream RegisterResponse); rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); - + rpc Download (DownloadRequest) returns (stream DownloadResponse); } message RegisterRequest { @@ -30,6 +30,22 @@ message RegisterResponse { string tx_hash = 3; } +message DownloadRequest { + string action_id = 1; +} + +message DownloadResponse { + oneof response_type { + DownloadEvent event = 1; + DataChunk chunk = 2; + } +} + +message DownloadEvent { + SupernodeEventType event_type = 1; + string message = 2; +} + enum SupernodeEventType { UNKNOWN = 0; ACTION_RETRIEVED = 1; @@ -43,6 +59,7 @@ enum SupernodeEventType { RQID_VERIFIED = 9; ARTEFACTS_STORED = 10; ACTION_FINALIZED = 11; + Artefacts_Downloaded = 12; } message HealthCheckRequest {} diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 7e045c91..7feedffe 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -183,3 +183,77 @@ func (server *ActionServer) HealthCheck(ctx context.Context, _ *pb.HealthCheckRe TasksInProgress: resp.TasksInProgress, }, nil } + +func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeService_DownloadServer) error { + fields := logtrace.Fields{ + logtrace.FieldMethod: "Download", + logtrace.FieldModule: "CascadeActionServer", + logtrace.FieldActionID: req.GetActionId(), + } + + ctx := stream.Context() + logtrace.Info(ctx, "download request received from client", fields) + + task := server.factory.NewCascadeRegistrationTask() + + var restoredFile []byte + + err := task.Download(ctx, &cascadeService.DownloadRequest{ + ActionID: req.GetActionId(), + }, func(resp *cascadeService.DownloadResponse) error { + grpcResp := &pb.DownloadResponse{ + ResponseType: &pb.DownloadResponse_Event{ + Event: &pb.DownloadEvent{ + EventType: pb.SupernodeEventType(resp.EventType), + Message: resp.Message, + }, + }, + } + + if len(resp.Artefacts) > 0 { + restoredFile = resp.Artefacts + } + + return stream.Send(grpcResp) + }) + + if err != nil { + logtrace.Error(ctx, "error occurred during download process", logtrace.Fields{ + logtrace.FieldError: err.Error(), + }) + return err + } + + if len(restoredFile) == 0 { + logtrace.Error(ctx, "no artefact file retrieved", fields) + return fmt.Errorf("no artefact to stream") + } + logtrace.Info(ctx, "streaming artefact file in chunks", fields) + + // Split and stream the file in 1024 byte chunks + const chunkSize = 1024 + for i := 0; i < len(restoredFile); i += chunkSize { + end := i + chunkSize + if end > len(restoredFile) { + end = len(restoredFile) + } + + err := stream.Send(&pb.DownloadResponse{ + ResponseType: &pb.DownloadResponse_Chunk{ + Chunk: &pb.DataChunk{ + Data: restoredFile[i:end], + }, + }, + }) + + if err != nil { + logtrace.Error(ctx, "failed to stream chunk", logtrace.Fields{ + logtrace.FieldError: err.Error(), + }) + return err + } + } + + logtrace.Info(ctx, "completed streaming all chunks", fields) + return nil +} diff --git a/supernode/services/cascade/adaptors/mocks/rq_mock.go b/supernode/services/cascade/adaptors/mocks/rq_mock.go index 34b73ee1..b51a7222 100644 --- a/supernode/services/cascade/adaptors/mocks/rq_mock.go +++ b/supernode/services/cascade/adaptors/mocks/rq_mock.go @@ -1,10 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. // Source: rq.go -// -// Generated by this command: -// -// mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go -// // Package cascadeadaptormocks is a generated GoMock package. package cascadeadaptormocks @@ -21,7 +16,6 @@ import ( type MockCodecService struct { ctrl *gomock.Controller recorder *MockCodecServiceMockRecorder - isgomock struct{} } // MockCodecServiceMockRecorder is the mock recorder for MockCodecService. @@ -41,6 +35,21 @@ func (m *MockCodecService) EXPECT() *MockCodecServiceMockRecorder { return m.recorder } +// Decode mocks base method. +func (m *MockCodecService) Decode(ctx context.Context, req adaptors.DecodeRequest) (adaptors.DecodeResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Decode", ctx, req) + ret0, _ := ret[0].(adaptors.DecodeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Decode indicates an expected call of Decode. +func (mr *MockCodecServiceMockRecorder) Decode(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockCodecService)(nil).Decode), ctx, req) +} + // EncodeInput mocks base method. func (m *MockCodecService) EncodeInput(ctx context.Context, taskID, path string, dataSize int) (adaptors.EncodeResult, error) { m.ctrl.T.Helper() @@ -51,7 +60,7 @@ func (m *MockCodecService) EncodeInput(ctx context.Context, taskID, path string, } // EncodeInput indicates an expected call of EncodeInput. -func (mr *MockCodecServiceMockRecorder) EncodeInput(ctx, taskID, path, dataSize any) *gomock.Call { +func (mr *MockCodecServiceMockRecorder) EncodeInput(ctx, taskID, path, dataSize interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EncodeInput", reflect.TypeOf((*MockCodecService)(nil).EncodeInput), ctx, taskID, path, dataSize) } diff --git a/supernode/services/cascade/adaptors/rq.go b/supernode/services/cascade/adaptors/rq.go index 2228b466..b3ce70fc 100644 --- a/supernode/services/cascade/adaptors/rq.go +++ b/supernode/services/cascade/adaptors/rq.go @@ -11,6 +11,7 @@ import ( //go:generate mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go type CodecService interface { EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error) + Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) } // EncodeResult represents the outcome of encoding the input data. @@ -45,3 +46,31 @@ func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, path string, Metadata: resp.Metadata, }, nil } + +type DecodeRequest struct { + Symbols map[string][]byte + Layout codec.Layout + ActionID string +} + +type DecodeResponse struct { + LayoutPath string + FilePath string +} + +// Decode decodes the provided symbols and returns the original file +func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { + resp, err := c.codec.Decode(ctx, codec.DecodeRequest{ + Symbols: req.Symbols, + Layout: req.Layout, + ActionID: req.ActionID, + }) + if err != nil { + return DecodeResponse{}, err + } + + return DecodeResponse{ + LayoutPath: resp.LayoutPath, + FilePath: resp.Path, + }, nil +} diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go new file mode 100644 index 00000000..bc234b9c --- /dev/null +++ b/supernode/services/cascade/download.go @@ -0,0 +1,186 @@ +package cascade + +import ( + "context" + "fmt" + "os" + "sort" + + actiontypes "github.com/LumeraProtocol/lumera/x/action/v1/types" + "github.com/LumeraProtocol/supernode/pkg/codec" + "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/pkg/utils" + "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" +) + +const ( + requiredSymbolPercent = 9 +) + +type DownloadRequest struct { + ActionID string +} + +type DownloadResponse struct { + EventType SupernodeEventType + Message string + Artefacts []byte +} + +func (task *CascadeRegistrationTask) Download( + ctx context.Context, + req *DownloadRequest, + send func(resp *DownloadResponse) error, +) error { + fields := logtrace.Fields{logtrace.FieldMethod: "Download", logtrace.FieldRequest: req} + logtrace.Info(ctx, "cascade-action-download request received", fields) + + actionDetails, err := task.LumeraClient.GetAction(ctx, req.ActionID) + if err != nil { + fields[logtrace.FieldError] = err + return task.wrapErr(ctx, "failed to get action", err, fields) + } + logtrace.Info(ctx, "action has been retrieved", fields) + task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", nil, send) + + if actionDetails.GetAction().State != actiontypes.ActionStateDone { + err = errors.New("action is not in a valid state") + fields[logtrace.FieldError] = "action state is not done yet" + fields[logtrace.FieldActionState] = actionDetails.GetAction().State + return task.wrapErr(ctx, "action not found", err, fields) + } + logtrace.Info(ctx, "action has been validated", fields) + task.streamDownloadEvent(SupernodeEventTypeActionFinalized, "action state has been validated", nil, send) + + metadata, err := task.decodeCascadeMetadata(ctx, actionDetails.GetAction().Metadata, fields) + if err != nil { + fields[logtrace.FieldError] = err.Error() + return task.wrapErr(ctx, "error decoding cascade metadata", err, fields) + } + logtrace.Info(ctx, "cascade metadata has been decoded", fields) + task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "metadata has been decoded", nil, send) + + file, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields) + if err != nil { + fields[logtrace.FieldError] = err.Error() + return task.wrapErr(ctx, "failed to download artifacts", err, fields) + } + logtrace.Info(ctx, "artifacts have been downloaded", fields) + task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "artifacts have been downloaded", file, send) + + return nil +} + +func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) ([]byte, error) { + var layout codec.Layout + for _, rqID := range metadata.RqIdsIds { + rqIDFile, err := task.P2PClient.Retrieve(ctx, rqID) + if err != nil || len(rqIDFile) == 0 { + continue + } + + layout, _, _, err = parseRQMetadataFile(rqIDFile) + + if len(layout.Blocks) == 0 { + logtrace.Info(ctx, "no symbols found in RQ metadata", fields) + continue + } + + if len(layout.Blocks) < int(float64(len(metadata.RqIdsIds))*requiredSymbolPercent/100) { + logtrace.Info(ctx, "not enough symbols found in RQ metadata", fields) + continue + } + + if err == nil { + break + } + } + + if len(layout.Blocks) == 0 { + return nil, errors.New("no symbols found in RQ metadata") + } + + return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID) +} + +func (task *CascadeRegistrationTask) restoreFileFromLayout( + ctx context.Context, + layout codec.Layout, + dataHash string, + actionID string, +) ([]byte, error) { + + fields := logtrace.Fields{ + logtrace.FieldActionID: actionID, + } + var allSymbols []string + for _, block := range layout.Blocks { + allSymbols = append(allSymbols, block.Symbols...) + } + sort.Strings(allSymbols) + + totalSymbols := len(allSymbols) + requiredSymbols := (totalSymbols*requiredSymbolPercent + 99) / 100 + + fields["totalSymbols"] = totalSymbols + fields["requiredSymbols"] = requiredSymbols + logtrace.Info(ctx, "symbols to be retrieved", fields) + + symbols, err := task.P2PClient.BatchRetrieve(ctx, allSymbols, requiredSymbols, actionID) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to retrieve symbols", fields) + return nil, fmt.Errorf("failed to retrieve symbols: %w", err) + } + + fields["retrievedSymbols"] = len(symbols) + logtrace.Info(ctx, "symbols retrieved", fields) + + // 2. Decode symbols using RaptorQ + decodeInfo, err := task.RQ.Decode(ctx, adaptors.DecodeRequest{ + ActionID: actionID, + Symbols: symbols, + Layout: layout, + }) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to decode symbols", fields) + return nil, fmt.Errorf("decode symbols using RaptorQ: %w", err) + } + + file, err := os.ReadFile(decodeInfo.FilePath) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to read file", fields) + return nil, fmt.Errorf("read decoded file: %w", err) + } + + // 3. Validate hash (Blake3) + fileHash, err := utils.Blake3Hash(file) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "failed to do hash", fields) + return nil, fmt.Errorf("hash file: %w", err) + } + + err = task.verifyDataHash(ctx, fileHash, dataHash, fields) + if err != nil { + logtrace.Error(ctx, "failed to verify hash", fields) + fields[logtrace.FieldError] = err.Error() + return nil, err + } + + logtrace.Info(ctx, "file successfully restored and hash verified", fields) + return file, nil +} + +func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg string, file []byte, send func(resp *DownloadResponse) error) { + _ = send(&DownloadResponse{ + EventType: eventType, + Message: msg, + Artefacts: file, + }) + + return +} diff --git a/supernode/services/cascade/events.go b/supernode/services/cascade/events.go index 6f9239f3..09d517be 100644 --- a/supernode/services/cascade/events.go +++ b/supernode/services/cascade/events.go @@ -15,4 +15,5 @@ const ( SupernodeEventTypeRqIDsVerified SupernodeEventType = 9 SupernodeEventTypeArtefactsStored SupernodeEventType = 10 SupernodeEventTypeActionFinalized SupernodeEventType = 11 + SupernodeEventTypeArtefactsDownloaded SupernodeEventType = 12 ) diff --git a/supernode/services/cascade/helper.go b/supernode/services/cascade/helper.go index 864c30f5..0ae132dd 100644 --- a/supernode/services/cascade/helper.go +++ b/supernode/services/cascade/helper.go @@ -1,6 +1,7 @@ package cascade import ( + "bytes" "context" "encoding/base64" "fmt" @@ -234,3 +235,30 @@ func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action return nil } + +func parseRQMetadataFile(data []byte) (layout codec.Layout, signature string, counter string, err error) { + decompressed, err := utils.ZstdDecompress(data) + if err != nil { + return layout, "", "", errors.Errorf("decompress rq metadata file: %w", err) + } + + // base64EncodeMetadata.Signature.Counter + parts := bytes.Split(decompressed, []byte{SeparatorByte}) + if len(parts) != 3 { + return layout, "", "", errors.New("invalid rq metadata format: expecting 3 parts (layout, signature, counter)") + } + + layoutJson, err := utils.B64Decode(parts[0]) + if err != nil { + return layout, "", "", errors.Errorf("base64 decode failed: %w", err) + } + + if err := json.Unmarshal(layoutJson, &layout); err != nil { + return layout, "", "", errors.Errorf("unmarshal layout: %w", err) + } + + signature = string(parts[1]) + counter = string(parts[2]) + + return layout, signature, counter, nil +} diff --git a/supernode/services/cascade/interfaces.go b/supernode/services/cascade/interfaces.go index d592c8b3..1c27c2f6 100644 --- a/supernode/services/cascade/interfaces.go +++ b/supernode/services/cascade/interfaces.go @@ -15,4 +15,5 @@ type TaskFactory interface { type RegistrationTaskService interface { Register(ctx context.Context, req *RegisterRequest, send func(resp *RegisterResponse) error) error HealthCheck(ctx context.Context) (HealthCheckResponse, error) + Download(ctx context.Context, req *DownloadRequest, send func(resp *DownloadResponse) error) error } diff --git a/supernode/services/cascade/mocks/cascade_interfaces_mock.go b/supernode/services/cascade/mocks/cascade_interfaces_mock.go index b3559fb4..01bbd320 100644 --- a/supernode/services/cascade/mocks/cascade_interfaces_mock.go +++ b/supernode/services/cascade/mocks/cascade_interfaces_mock.go @@ -72,6 +72,20 @@ func (m *MockRegistrationTaskService) EXPECT() *MockRegistrationTaskServiceMockR return m.recorder } +// Download mocks base method. +func (m *MockRegistrationTaskService) Download(ctx context.Context, req *cascade.DownloadRequest, send func(*cascade.DownloadResponse) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Download", ctx, req, send) + ret0, _ := ret[0].(error) + return ret0 +} + +// Download indicates an expected call of Download. +func (mr *MockRegistrationTaskServiceMockRecorder) Download(ctx, req, send interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockRegistrationTaskService)(nil).Download), ctx, req, send) +} + // HealthCheck mocks base method. func (m *MockRegistrationTaskService) HealthCheck(ctx context.Context) (cascade.HealthCheckResponse, error) { m.ctrl.T.Helper()