Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 170 additions & 137 deletions gen/supernode/action/cascade/service.pb.go

Large diffs are not rendered by default.

70 changes: 15 additions & 55 deletions gen/supernode/action/cascade/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 18 additions & 19 deletions proto/supernode/action/cascade/service.proto
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
syntax = "proto3";

package cascade;

option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade";

service CascadeService {
rpc Session(stream SessionRequest) returns (stream SessionReply);
rpc UploadInputData (UploadInputDataRequest) returns (UploadInputDataResponse);
service CascadeService {
rpc UploadInputData (stream UploadInputDataRequest) returns (UploadInputDataResponse);
}

message UploadInputDataRequest {
string filename = 1;
string action_id = 2;
string data_hash = 3;
int32 rq_max = 4;
string signed_data = 5;
bytes data = 6;
oneof request_type {
DataChunk chunk = 1;
Metadata metadata = 2;
}
}

message UploadInputDataResponse {
bool success = 1;
string message = 2;
message DataChunk {
bytes data = 1;
}

message SessionRequest {
bool is_primary = 1;
message Metadata {
string filename = 1;
string action_id = 2;
string data_hash = 3;
int32 rq_max = 4;
string signed_data = 5;
}

message SessionReply {
string sessID = 1;
}
message UploadInputDataResponse {
bool success = 1;
string message = 2;
}
29 changes: 18 additions & 11 deletions supernode/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"github.com/LumeraProtocol/supernode/pkg/raptorq"
"github.com/LumeraProtocol/supernode/pkg/storage/rqstore"
"github.com/LumeraProtocol/supernode/supernode/config"
"github.com/LumeraProtocol/supernode/supernode/node/action/server/cascade"
"github.com/LumeraProtocol/supernode/supernode/node/supernode/server"
"github.com/LumeraProtocol/supernode/supernode/services/cascade"
cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade"
"github.com/LumeraProtocol/supernode/supernode/services/common"

cKeyring "github.com/cosmos/cosmos-sdk/crypto/keyring"
Expand Down Expand Up @@ -95,35 +96,41 @@ The supernode will connect to the Lumera network and begin participating in the
}

// Configure cascade service
cascadeService := cascade.NewCascadeService(
&cascade.Config{
cService := cascadeService.NewCascadeService(
&cascadeService.Config{
Config: common.Config{
SupernodeAccountAddress: appConfig.SupernodeConfig.KeyName,
},
RaptorQServicePort: fmt.Sprintf("%d", appConfig.RaptorQConfig.ServicePort),
RaptorQServiceAddress: appConfig.RaptorQConfig.ServiceAddress,
RqFilesDir: appConfig.RaptorQConfig.FilesDir,
NumberConnectedNodes: 1,
},
lumeraClient,
nil,
*p2pService,
raptorQClientConnection.RaptorQ(raptorq.NewConfig(), lumeraClient, rqStore),
raptorq.NewClient(),
rqStore,
)

// Create cascade action server
cascadeActionServer := cascade.NewCascadeActionServer(cService)

// Configure server
serverConfig := &server.Config{
ListenAddresses: appConfig.SupernodeConfig.IpAddress, // FIXME : confirm
Port: int(appConfig.SupernodeConfig.Port), // FIXME : confirm

Identity: appConfig.SupernodeConfig.Identity,
ListenAddresses: appConfig.SupernodeConfig.IpAddress,
Port: int(appConfig.SupernodeConfig.Port),
}
grpc := server.New(serverConfig,

// Create gRPC server
grpcServer, err := server.New(serverConfig,
"service",
cascadeService,
kr,
cascadeActionServer,
)

// Start the services
RunServices(ctx, grpc, cascadeService, *p2pService)
RunServices(ctx, grpcServer, cService, *p2pService)

// Set up signal handling for graceful shutdown
sigCh := make(chan os.Signal, 1)
Expand Down
4 changes: 2 additions & 2 deletions supernode/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Supernode Configuration
supernode:
key_name: "mukey" # Account name for the supernode in keyring
identity: "lumera1uarju67x0hzetfzhgktay3h25pgxdy7yxap2xk" # Identity of the supernode, lumera address
ip_address: "0.0.0.0"
port: 4444
data_dir: "~/.supernode" # Base directory in home folder
Expand All @@ -27,6 +28,5 @@ lumera:

# RaptorQ Configuration
raptorq:
service_address: "0.0.0.0"
service_port: 1234
service_address: "localhost:50051"
files_dir: "~/.supernode/raptorq_files"
2 changes: 1 addition & 1 deletion supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

type SupernodeConfig struct {
Identity string `yaml:"identity"`
KeyName string `yaml:"key_name"`
IpAddress string `yaml:"ip_address"`
Port uint16 `yaml:"port"`
Expand Down Expand Up @@ -39,7 +40,6 @@ type LumeraClientConfig struct {

type RaptorQConfig struct {
ServiceAddress string `yaml:"service_address"`
ServicePort uint16 `yaml:"service_port"`
FilesDir string `yaml:"files_dir"`
}

Expand Down
100 changes: 91 additions & 9 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,102 @@
package cascade

import (
cascadeGen "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
"github.com/LumeraProtocol/supernode/supernode/node/common"
"github.com/LumeraProtocol/supernode/supernode/services/cascade"
"fmt"
"io"

pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
"github.com/LumeraProtocol/supernode/pkg/logtrace"
cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade"
"google.golang.org/grpc"
)

type CascadeActionServer struct {
cascadeGen.UnimplementedCascadeServiceServer

*common.RegisterCascade
pb.UnimplementedCascadeServiceServer
service *cascadeService.CascadeService
}

// NewCascadeActionServer returns a new CascadeActionServer instance.
func NewCascadeActionServer(service *cascade.CascadeService) *CascadeActionServer {
func NewCascadeActionServer(service *cascadeService.CascadeService) *CascadeActionServer {
return &CascadeActionServer{
RegisterCascade: common.NewRegisterCascade(service),
service: service,
}
}

func (server *CascadeActionServer) Desc() *grpc.ServiceDesc {
return &pb.CascadeService_ServiceDesc
}
func (server *CascadeActionServer) UploadInputData(stream pb.CascadeService_UploadInputDataServer) error {
fields := logtrace.Fields{
logtrace.FieldMethod: "UploadInputData",
logtrace.FieldModule: "CascadeActionServer",
}

ctx := stream.Context()
logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields)

// Collect data chunks
var allData []byte
var metadata *pb.Metadata

// Process incoming stream
for {
req, err := stream.Recv()
if err == io.EOF {
// End of stream
break
}
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "error receiving stream data", fields)
return fmt.Errorf("failed to receive stream data: %w", err)
}

// Check which type of message we received
switch x := req.RequestType.(type) {
case *pb.UploadInputDataRequest_Chunk:
// Add data chunk to our collection
allData = append(allData, x.Chunk.Data...)
logtrace.Info(ctx, "received data chunk", logtrace.Fields{
"chunk_size": len(x.Chunk.Data),
"total_size_so_far": len(allData),
})

case *pb.UploadInputDataRequest_Metadata:
// Store metadata - this should be the final message
metadata = x.Metadata
logtrace.Info(ctx, "received metadata", logtrace.Fields{
"filename": metadata.Filename,
"action_id": metadata.ActionId,
"data_hash": metadata.DataHash,
})
}
}

// Verify we received metadata
if metadata == nil {
logtrace.Error(ctx, "no metadata received in stream", fields)
return fmt.Errorf("no metadata received")
}

// Process the complete data
task := server.service.NewCascadeRegistrationTask()
res, err := task.UploadInputData(ctx, &cascadeService.UploadInputDataRequest{
Filename: metadata.Filename,
ActionID: metadata.ActionId,
DataHash: metadata.DataHash,
RqMax: metadata.RqMax,
SignedData: metadata.SignedData,
Data: allData,
})

if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "failed to upload input data", fields)
return fmt.Errorf("cascade services upload input data error: %w", err)
}

// Send the response
return stream.SendMsg(&pb.UploadInputDataResponse{
Success: res.Success,
Message: res.Message,
})
}
Loading