diff --git a/sdk/action/client.go b/sdk/action/client.go index 15c9b46f..b2a5092f 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -53,18 +53,7 @@ func NewClient(ctx context.Context, config config.Config, logger log.Logger, key } // StartCascade initiates a cascade operation -func (c *ClientImpl) StartCascade(ctx context.Context, - fileHash string, - actionID string, - filePath string, - signedData string, -) (string, error) { - c.logger.Debug(ctx, "Starting cascade operation", - "fileHash", fileHash, - "actionID", actionID, - "filePath", filePath, - ) - +func (c *ClientImpl) StartCascade(ctx context.Context, fileHash string, actionID string, filePath string, signedData string) (string, error) { if fileHash == "" { c.logger.Error(ctx, "Empty file hash provided") return "", ErrEmptyFileHash @@ -83,7 +72,7 @@ func (c *ClientImpl) StartCascade(ctx context.Context, return "", ErrEmptyFileNotFound } - taskID, err := c.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath, signedData) + taskID, err := c.taskManager.CreateCascadeTask(ctx, actionID, filePath) if err != nil { c.logger.Error(ctx, "Failed to create cascade task", "error", err) return "", fmt.Errorf("failed to create cascade task: %w", err) diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 38800380..9f51d5f5 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -30,17 +30,13 @@ func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient, } } -func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error) { - a.logger.Debug(ctx, "Uploading input data through adapter", - "filename", in.Filename, - "actionID", in.ActionID, - "filePath", in.FilePath) +func (a *cascadeAdapter) RegisterCascade(ctx context.Context, in *RegisterCascadeRequest, opts ...grpc.CallOption) (*RegisterCascadeResponse, error) { + a.logger.Debug(ctx, "RegisterCascade through adapter", "task_id", in.TaskID, "actionID", in.ActionID, "filePath", in.FilePath) // Open the file for reading file, err := os.Open(in.FilePath) if err != nil { a.logger.Error(ctx, "Failed to open file for reading", - "filePath", in.FilePath, "error", err) return nil, fmt.Errorf("failed to open file: %w", err) } @@ -50,18 +46,16 @@ func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDat fileInfo, err := file.Stat() if err != nil { a.logger.Error(ctx, "Failed to get file stats", - "filePath", in.FilePath, "error", err) return nil, fmt.Errorf("failed to get file stats: %w", err) } fileSize := fileInfo.Size() a.logger.Debug(ctx, "File opened for streaming", - "filePath", in.FilePath, "fileSize", fileSize) // Create the client stream - stream, err := a.client.UploadInputData(ctx, opts...) + stream, err := a.client.Register(ctx, opts...) if err != nil { a.logger.Error(ctx, "Failed to create upload stream", "error", err) @@ -90,8 +84,8 @@ func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDat } // Only send what was actually read - chunk := &cascade.UploadInputDataRequest{ - RequestType: &cascade.UploadInputDataRequest_Chunk{ + chunk := &cascade.RegisterRequest{ + RequestType: &cascade.RegisterRequest_Chunk{ Chunk: &cascade.DataChunk{ Data: buffer[:n], }, @@ -99,68 +93,46 @@ func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDat } if err := stream.Send(chunk); err != nil { - a.logger.Error(ctx, "Failed to send data chunk", - "chunkIndex", chunkIndex, - "error", err) + a.logger.Error(ctx, "Failed to send data chunk", "chunkIndex", chunkIndex, "error", err) return nil, fmt.Errorf("failed to send chunk: %w", err) } bytesRead += int64(n) progress := float64(bytesRead) / float64(fileSize) * 100 - a.logger.Debug(ctx, "Sent data chunk", - "chunkIndex", chunkIndex, - "chunkSize", n, - "progress", fmt.Sprintf("%.1f%%", progress)) + a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", n, "progress", fmt.Sprintf("%.1f%%", progress)) chunkIndex++ } // Send metadata as the final message - metadata := &cascade.UploadInputDataRequest{ - RequestType: &cascade.UploadInputDataRequest_Metadata{ + metadata := &cascade.RegisterRequest{ + RequestType: &cascade.RegisterRequest_Metadata{ Metadata: &cascade.Metadata{ - Filename: in.Filename, - ActionId: in.ActionID, - DataHash: in.DataHash, - RqMax: in.RqMax, - SignedData: in.SignedData, + TaskId: in.TaskID, + ActionId: in.ActionID, }, }, } if err := stream.Send(metadata); err != nil { - a.logger.Error(ctx, "Failed to send metadata", - "filename", in.Filename, - "actionID", in.ActionID, - "error", err) + a.logger.Error(ctx, "Failed to send metadata", "task_id", in.TaskID, "actionID", in.ActionID, "error", err) return nil, fmt.Errorf("failed to send metadata: %w", err) } - a.logger.Debug(ctx, "Sent metadata", - "filename", in.Filename, - "actionID", in.ActionID) - resp, err := stream.CloseAndRecv() if err != nil { - a.logger.Error(ctx, "Failed to close stream and receive response", - "filename", in.Filename, - "actionID", in.ActionID, - "error", err) + a.logger.Error(ctx, "Failed to close stream and receive response", "task_id", in.TaskID, "actionID", in.ActionID, "error", err) return nil, fmt.Errorf("failed to receive response: %w", err) } - response := &UploadInputDataResponse{ + response := &RegisterCascadeResponse{ Success: resp.Success, Message: resp.Message, } - a.logger.Info(ctx, "Successfully uploaded input data", - "filename", in.Filename, - "actionID", in.ActionID, - "fileSize", fileSize, - "success", resp.Success, - "message", resp.Message) + a.logger.Info(ctx, "Successfully Registered with Supernode", "task_id", in.TaskID, "actionID", in.ActionID, "fileSize", fileSize, + "success", resp.Success, "message", resp.Message) return response, nil } diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index 6137783a..1fb04870 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -6,20 +6,17 @@ import ( "google.golang.org/grpc" ) -type UploadInputDataRequest struct { - Filename string - ActionID string - DataHash string - SignedData string - RqMax int32 - FilePath string +type RegisterCascadeRequest struct { + ActionID string + TaskID string + FilePath string } -type UploadInputDataResponse struct { +type RegisterCascadeResponse struct { Success bool Message string } type CascadeServiceClient interface { - UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error) + RegisterCascade(ctx context.Context, in *RegisterCascadeRequest, opts ...grpc.CallOption) (*RegisterCascadeResponse, error) } diff --git a/sdk/net/client.go b/sdk/net/client.go index b069cf46..23a8cddd 100644 --- a/sdk/net/client.go +++ b/sdk/net/client.go @@ -11,8 +11,8 @@ import ( // SupernodeClient defines the interface for communicating with supernodes type SupernodeClient interface { - // UploadInputData uploads input data for cascade processing - UploadInputData(ctx context.Context, in *supernodeservice.UploadInputDataRequest, opts ...grpc.CallOption) (*supernodeservice.UploadInputDataResponse, error) + //RegisterCascade uploads input data to Supernode for processing cascade request + RegisterCascade(ctx context.Context, in *supernodeservice.RegisterCascadeRequest, opts ...grpc.CallOption) (*supernodeservice.RegisterCascadeResponse, error) // HealthCheck performs a health check on the supernode HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error) diff --git a/sdk/net/impl.go b/sdk/net/impl.go index 6e016634..726a4fd8 100644 --- a/sdk/net/impl.go +++ b/sdk/net/impl.go @@ -104,18 +104,18 @@ func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring. }, nil } -// UploadInputData sends data to the supernode for cascade processing -func (c *supernodeClient) UploadInputData(ctx context.Context, - in *supernodeservice.UploadInputDataRequest, opts ...grpc.CallOption, -) (*supernodeservice.UploadInputDataResponse, error) { +// RegisterCascade sends data to the supernode for cascade processing +func (c *supernodeClient) RegisterCascade(ctx context.Context, + in *supernodeservice.RegisterCascadeRequest, opts ...grpc.CallOption, +) (*supernodeservice.RegisterCascadeResponse, error) { - resp, err := c.cascadeClient.UploadInputData(ctx, in, opts...) + resp, err := c.cascadeClient.RegisterCascade(ctx, in, opts...) if err != nil { return nil, fmt.Errorf("upload input data failed: %w", err) } c.logger.Info(ctx, "Input data uploaded successfully", - "actionID", in.ActionID, "filename", in.Filename, "filePath", in.FilePath) + "actionID", in.ActionID, "filePath", in.FilePath) return resp, nil } diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 7c059322..01f4a674 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "path/filepath" "sync" "time" @@ -24,23 +23,14 @@ const ( type CascadeTask struct { BaseTask - FileHash string - FilePath string - SignedData string + FilePath string } // NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters -func NewCascadeTask( - base BaseTask, - fileHash string, - filePath string, - signedData string, -) *CascadeTask { +func NewCascadeTask(base BaseTask, filePath string) *CascadeTask { return &CascadeTask{ - BaseTask: base, - FileHash: fileHash, - FilePath: filePath, - SignedData: signedData, + BaseTask: base, + FilePath: filePath, } } @@ -155,12 +145,10 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum } clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, factoryCfg) - req := &supernodeservice.UploadInputDataRequest{ - Filename: filepath.Base(t.FilePath), - ActionID: t.ActionID, - DataHash: t.FileHash, - SignedData: t.SignedData, - FilePath: t.FilePath, + req := &supernodeservice.RegisterCascadeRequest{ + ActionID: t.ActionID, + FilePath: t.FilePath, + TaskID: t.TaskID, } var lastErr error @@ -175,13 +163,10 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum return fmt.Errorf("failed to upload to all supernodes: %w", lastErr) } func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode, - factory *net.ClientFactory, req *supernodeservice.UploadInputDataRequest) error { + factory *net.ClientFactory, req *supernodeservice.RegisterCascadeRequest) error { t.logEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", map[string]interface{}{ - "supernode": sn.GrpcEndpoint, - "sn-address": sn.CosmosAddress, - "iteration": index + 1, - }) + "supernode": sn.GrpcEndpoint, "sn-address": sn.CosmosAddress, "iteration": index + 1}) client, err := factory.CreateClient(ctx, sn) if err != nil { @@ -192,7 +177,7 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum uploadCtx, cancel := context.WithTimeout(ctx, registrationTimeout) defer cancel() - resp, err := client.UploadInputData(uploadCtx, req) + resp, err := client.RegisterCascade(uploadCtx, req) if err != nil { return fmt.Errorf("upload to %s: %w", sn.CosmosAddress, err) } diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 8712e80d..32e025f8 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -17,7 +17,7 @@ const MAX_EVENT_WORKERS = 100 // Manager handles task creation and management type Manager interface { - CreateCascadeTask(ctx context.Context, fileHash, actionID, filePath string, signedData string) (string, error) + CreateCascadeTask(ctx context.Context, actionID, filePath string) (string, error) GetTask(ctx context.Context, taskID string) (*TaskEntry, bool) DeleteTask(ctx context.Context, taskID string) error SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) @@ -77,16 +77,8 @@ func NewManager( } // CreateCascadeTask creates and starts a Cascade task using the new pattern -func (m *ManagerImpl) CreateCascadeTask( - ctx context.Context, - fileHash string, - actionID string, - filePath string, - signedData string, -) (string, error) { - m.logger.Info(ctx, "Creating cascade task", - "fileHash", fileHash, - "actionID", actionID) +func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, actionID string, filePath string) (string, error) { + m.logger.Info(ctx, "Creating cascade task", "filePath", filePath, "actionID", actionID) // Generate task ID // slice this to 8 bytes @@ -105,7 +97,7 @@ func (m *ManagerImpl) CreateCascadeTask( logger: m.logger, } // Create cascade-specific task - task := NewCascadeTask(baseTask, fileHash, filePath, signedData) + task := NewCascadeTask(baseTask, filePath) // Store task in cache m.taskCache.Set(ctx, taskID, task, TaskTypeCascade) diff --git a/supernode/services/cascade/helper.go b/supernode/services/cascade/helper.go new file mode 100644 index 00000000..d909171a --- /dev/null +++ b/supernode/services/cascade/helper.go @@ -0,0 +1,225 @@ +package cascade + +import ( + "context" + + "strings" + + "github.com/LumeraProtocol/supernode/pkg/codec" + "github.com/LumeraProtocol/supernode/pkg/errors" + "github.com/LumeraProtocol/supernode/pkg/log" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" + "github.com/LumeraProtocol/supernode/pkg/utils" + "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/golang/protobuf/proto" + json "github.com/json-iterator/go" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" +) + +func (task *CascadeRegistrationTask) fetchAction(ctx context.Context, actionID string, f logtrace.Fields) (*actiontypes.Action, error) { + res, err := task.lumeraClient.Action().GetAction(ctx, actionID) + if err != nil { + return nil, task.wrapErr(ctx, "failed to get action", err, f) + } + + if res.GetAction().ActionID == "" { + return nil, task.wrapErr(ctx, "action not found", errors.New(""), f) + } + logtrace.Info(ctx, "action has been retrieved", f) + + return res.GetAction(), nil +} + +func (task *CascadeRegistrationTask) ensureIsTopSupernode(ctx context.Context, blockHeight uint64, f logtrace.Fields) error { + top, err := task.lumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) + if err != nil { + return task.wrapErr(ctx, "failed to get top SNs", err, f) + } + logtrace.Info(ctx, "Fetched Top Supernodes", f) + + if !supernode.Exists(top.Supernodes, task.config.SupernodeAccountAddress) { + return task.wrapErr(ctx, "current supernode does not exist in the top SMs list", errors.New(""), f) + } + + return nil +} + +func (task *CascadeRegistrationTask) decodeCascadeMetadata(ctx context.Context, raw []byte, f logtrace.Fields) (actiontypes.CascadeMetadata, error) { + var meta actiontypes.CascadeMetadata + if err := proto.Unmarshal(raw, &meta); err != nil { + return meta, task.wrapErr(ctx, "failed to unmarshal cascade metadata", err, f) + } + return meta, nil +} + +func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data []byte, expected string, f logtrace.Fields) error { + dh, _ := utils.Sha3256hash(data) + b64 := utils.B64Encode(dh) + if string(b64) != expected { + return task.wrapErr(ctx, "data hash doesn't match", errors.New(""), f) + } + logtrace.Info(ctx, "request data-hash has been matched with the action data-hash", f) + + return nil +} + +func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, data []byte, f logtrace.Fields) (*codec.EncodeResponse, error) { + resp, err := task.codec.Encode(ctx, codec.EncodeRequest{Data: data, TaskID: task.ID()}) + if err != nil { + return nil, task.wrapErr(ctx, "failed to encode data", err, f) + } + return &resp, nil +} + +func (task *CascadeRegistrationTask) verifySignatureAndDecodeLayout(ctx context.Context, encoded string, creator string, + encodedMeta codec.Layout, f logtrace.Fields) (codec.Layout, string, error) { + + file, sig, err := extractSignatureAndFirstPart(encoded) + if err != nil { + return codec.Layout{}, "", task.wrapErr(ctx, "failed to extract signature and first part", err, f) + } + + if err := task.lumeraClient.Auth().Verify(ctx, creator, []byte(file), []byte(sig)); err != nil { + return codec.Layout{}, "", task.wrapErr(ctx, "failed to verify signature", err, f) + } + + layout, err := decodeMetadataFile(file) + if err != nil { + return codec.Layout{}, "", task.wrapErr(ctx, "failed to decode metadata file", err, f) + } + + return layout, sig, nil +} + +func (task *CascadeRegistrationTask) generateRQIDFiles(ctx context.Context, meta actiontypes.CascadeMetadata, + sig, creator string, encodedMeta codec.Layout, f logtrace.Fields) (GenRQIdentifiersFilesResponse, error) { + res, err := GenRQIdentifiersFiles(ctx, GenRQIdentifiersFilesRequest{ + Metadata: encodedMeta, + CreatorSNAddress: creator, + RqMax: uint32(meta.RqIdsMax), + Signature: sig, + IC: uint32(meta.RqIdsIc), + }) + if err != nil { + return GenRQIdentifiersFilesResponse{}, + task.wrapErr(ctx, "failed to generate RQID Files", err, f) + } + logtrace.Info(ctx, "rq symbols, rq-ids and rqid-files have been generated", f) + return res, nil +} + +func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, idFiles [][]byte, symbolsDir string, f logtrace.Fields) error { + logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": task.ID(), "fileCount": len(idFiles)}) + + if err := task.storeIDFiles(ctx, idFiles); err != nil { + return task.wrapErr(ctx, "failed to store ID files", err, f) + } + logtrace.Info(ctx, "id files have been stored", f) + + if err := task.storeRaptorQSymbols(ctx, symbolsDir); err != nil { + return task.wrapErr(ctx, "error storing raptor-q symbols", err, f) + } + logtrace.Info(ctx, "raptor-q symbols have been stored", f) + + return nil +} + +func (task *CascadeRegistrationTask) wrapErr(ctx context.Context, msg string, err error, f logtrace.Fields) error { + if err != nil { + f[logtrace.FieldError] = err.Error() + } + logtrace.Error(ctx, msg, f) + + return status.Errorf(codes.Internal, msg) +} + +// extractSignatureAndFirstPart extracts the signature and first part from the encoded data +// data is expected to be in format: b64(JSON(Layout)).Signature +func extractSignatureAndFirstPart(data string) (encodedMetadata string, signature string, err error) { + parts := strings.Split(data, ".") + if len(parts) < 2 { + return "", "", errors.New("invalid data format") + } + + // The first part is the base64 encoded data + return parts[0], parts[1], nil +} + +func decodeMetadataFile(data string) (layout codec.Layout, err error) { + // Decode the base64 encoded data + decodedData, err := utils.B64Decode([]byte(data)) + if err != nil { + return layout, errors.Errorf("failed to decode data: %w", err) + } + + // Unmarshal the decoded data into a layout + if err := json.Unmarshal(decodedData, &layout); err != nil { + return layout, errors.Errorf("failed to unmarshal data: %w", err) + } + + return layout, nil +} + +func verifyIDs(ctx context.Context, ticketMetadata, metadata codec.Layout) error { + // Verify that the symbol identifiers match between versions + if err := utils.EqualStrList(ticketMetadata.Blocks[0].Symbols, metadata.Blocks[0].Symbols); err != nil { + return errors.Errorf("symbol identifiers don't match: %w", err) + } + + // Verify that the block hashes match + if ticketMetadata.Blocks[0].Hash != metadata.Blocks[0].Hash { + return errors.New("block hashes don't match") + } + + return nil +} + +// storeIDFiles stores ID files to P2P +func (task *CascadeRegistrationTask) storeIDFiles(ctx context.Context, metadataFiles [][]byte) error { + ctx = context.WithValue(ctx, log.TaskIDKey, task.ID()) + task.storage.TaskID = task.ID() + + // Log basic info before storing + logtrace.Info(ctx, "Storing ID files", logtrace.Fields{ + "taskID": task.ID(), + }) + + // Check if files exist + if len(metadataFiles) == 0 { + logtrace.Error(ctx, "No ID files to store", nil) + return errors.New("no ID files to store") + } + + // Store files with better error handling + if err := task.storage.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata); err != nil { + logtrace.Error(ctx, "Store operation failed", logtrace.Fields{ + "error": err.Error(), + "fileCount": len(metadataFiles), + }) + return errors.Errorf("store ID files into kademlia: %w", err) + } + + logtrace.Info(ctx, "ID files stored successfully", nil) + return nil +} + +// storeRaptorQSymbols stores RaptorQ symbols to P2P +func (task *CascadeRegistrationTask) storeRaptorQSymbols(ctx context.Context, symbolsDir string) error { + // Add improved logging + logtrace.Info(ctx, "Storing RaptorQ symbols", logtrace.Fields{ + "taskID": task.ID(), + }) + + err := task.storage.StoreRaptorQSymbolsIntoP2P(ctx, task.ID(), symbolsDir) + if err != nil { + logtrace.Error(ctx, "Failed to store RaptorQ symbols", logtrace.Fields{ + "taskID": task.ID(), + "error": err.Error(), + }) + } + return err +} diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index d6dc6917..26df9873 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -2,22 +2,8 @@ package cascade import ( "context" - "encoding/hex" - "strings" - "github.com/LumeraProtocol/supernode/pkg/codec" - "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" "github.com/LumeraProtocol/supernode/pkg/logtrace" - "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" - "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/LumeraProtocol/supernode/supernode/services/common" - json "github.com/json-iterator/go" - - actiontypes "github.com/LumeraProtocol/supernode/gen/lumera/action/types" - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // RegisterRequest contains parameters for upload request @@ -33,226 +19,75 @@ type RegisterResponse struct { Message string } -// Register processes the upload request for cascade input data +// Register processes the upload request for cascade input data. +// 1- Fetch & validate action (it should be a cascade action registered on the chain) +// 2- Ensure this super-node is eligible to process the action (should be in the top supernodes list for the action block height) +// 3- Get the cascade metadata from the action: it contains the data hash and the signatures +// +// Assuming data hash is a base64 encoded string of blake3 hash of the data +// The signatures field is: b64(JSON(Layout)).Signature where Layout is codec.Layout +// The layout is a JSON object that contains the metadata of the data +// +// 4- Verify the data hash (the data hash should match the one in the action ticket) - again, hash function should be blake3 +// 5- Generate Symbols with codec (RQ-Go Library) (the data should be encoded using the codec) +// 6- Extract the layout and the signature from Step 3. Verify the signature using the creator's public key (creator address is in the action) +// 7- Generate RQ-ID files from the layout that we generated locally and then match those with the ones in the action +// 8- Verify the IDs in the layout and the metadata (the IDs should match the ones in the action) +// 9- Store the artefacts in P2P Storage (the redundant metadata files and the symbols from the symbols dir) func (task *CascadeRegistrationTask) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { - fields := logtrace.Fields{ - logtrace.FieldMethod: "Register", - logtrace.FieldRequest: req, - } - - // Get action details from Lumera - actionRes, err := task.lumeraClient.Action().GetAction(ctx, req.ActionID) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to get action", fields) - return nil, status.Errorf(codes.Internal, "failed to get action") - } - if actionRes.GetAction().ActionID == "" { - logtrace.Error(ctx, "action not found", fields) - return nil, status.Errorf(codes.Internal, "action not found") - } - actionDetails := actionRes.GetAction() - logtrace.Info(ctx, "action has been retrieved", fields) + fields := logtrace.Fields{logtrace.FieldMethod: "Register", logtrace.FieldRequest: req} - topSNsRes, err := task.lumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, uint64(actionDetails.BlockHeight)) + /* 1. Fetch & validate action -------------------------------------------------- */ + action, err := task.fetchAction(ctx, req.ActionID, fields) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to get top SNs", fields) - return nil, status.Errorf(codes.Internal, "failed to get top SNs") - } - logtrace.Info(ctx, "top sns have been fetched", fields) - - // Verify current supernode is in the top list - if !supernode.Exists(topSNsRes.Supernodes, task.config.SupernodeAccountAddress) { - logtrace.Error(ctx, "current supernode do not exist in the top sns list", fields) - return nil, status.Errorf(codes.Internal, "current supernode does not exist in the top sns list") - } - logtrace.Info(ctx, "current supernode exists in the top sns list", fields) - - // Parse the action metadata to CascadeMetadata - var cascadeMetadata actiontypes.CascadeMetadata - if err := proto.Unmarshal(actionDetails.Metadata, &cascadeMetadata); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to unmarshal cascade metadata", fields) - return nil, status.Errorf(codes.Internal, "failed to unmarshal cascade metadata") - } - - // Verify data hash matches action metadata - dataHash, _ := utils.Sha3256hash(req.Data) - hash := utils.B64Encode(dataHash) - if hex.EncodeToString(hash) != cascadeMetadata.DataHash { - logtrace.Error(ctx, "data hash doesn't match", fields) - return nil, status.Errorf(codes.Internal, "data hash doesn't match") + return nil, err } - logtrace.Info(ctx, "request data-hash has been matched with the action data-hash", fields) - encodeRequest := codec.EncodeRequest{ - Data: req.Data, - TaskID: task.ID(), + /* 2. Ensure this super-node is eligible -------------------------------------- */ + if err := task.ensureIsTopSupernode(ctx, uint64(action.BlockHeight), fields); err != nil { + return nil, err } - resp, err := task.codec.Encode(ctx, encodeRequest) + /* 3. Decode cascade metadata -------------------------------------------------- */ + cascadeMeta, err := task.decodeCascadeMetadata(ctx, action.Metadata, fields) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to encode data", fields) - return nil, status.Errorf(codes.Internal, "failed to encode data") + return nil, err } - encodedMetadata, signature, err := extractSignatureAndFirstPart(cascadeMetadata.Signatures) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to extract signature and first part", fields) - return nil, status.Errorf(codes.Internal, "failed to extract signature and first part") + /* 4. Verify data hash --------------------------------------------------------- */ + if err := task.verifyDataHash(ctx, req.Data, cascadeMeta.DataHash, fields); err != nil { + return nil, err } - // Verify signature against the encodedRQIDFile generated by the supernode - err = task.lumeraClient.Auth().Verify(ctx, actionDetails.GetCreator(), []byte(encodedMetadata), []byte(signature)) + /* 5. Encode the raw data ------------------------------------------------------ */ + encResp, err := task.encodeInput(ctx, req.Data, fields) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to verify signature", fields) - return nil, status.Errorf(codes.Internal, "failed to verify signature") + return nil, err } - // Decode the metadata file - layout, err := decodeMetadataFile(encodedMetadata) + /* 6. Signature verification + layout decode ---------------------------------- */ + layout, signature, err := task.verifySignatureAndDecodeLayout( + ctx, cascadeMeta.Signatures, action.Creator, encResp.Metadata, fields, + ) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to decode metadata file", fields) - return nil, status.Errorf(codes.Internal, "failed to decode metadata file") + return nil, err } - // Generate RaptorQ identifiers - res, err := GenRQIdentifiersFiles(ctx, GenRQIdentifiersFilesRequest{ - Metadata: resp.Metadata, - CreatorSNAddress: actionDetails.GetCreator(), - RqMax: uint32(cascadeMetadata.RqIdsMax), - Signature: signature, - IC: uint32(cascadeMetadata.RqIdsIc), - }) + /* 7. Generate RQ-ID files ----------------------------------------------------- */ + rqidResp, err := task.generateRQIDFiles(ctx, cascadeMeta, signature, action.Creator, encResp.Metadata, fields) if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to generate RQID Files", fields) - return nil, status.Errorf(codes.Internal, "failed to generate RQID Files") - } - logtrace.Info(ctx, "rq symbols, rq-ids and rqid-files have been generated", fields) - - // Verify that the symbol identifiers match between versions - if err := verifyIDs(ctx, layout, resp.Metadata); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to verify IDs", fields) - return nil, status.Errorf(codes.Internal, "failed to verify IDs") + return nil, err } - // About to store ID files - logtrace.Info(ctx, "About to store ID files", logtrace.Fields{ - "taskID": task.ID(), - "fileCount": 0, - }) - // Store ID files to P2P - if err = task.storeIDFiles(ctx, res.RedundantMetadataFiles); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "failed to store ID files", fields) - return nil, status.Errorf(codes.Internal, "failed to store ID files") + /* 8. Consistency checks ------------------------------------------------------- */ + if err := verifyIDs(ctx, layout, encResp.Metadata); err != nil { + return nil, task.wrapErr(ctx, "failed to verify IDs", err, fields) } - logtrace.Info(ctx, "id files have been stored", fields) - // Store RaptorQ symbols - if err = task.storeRaptorQSymbols(ctx, resp.SymbolsDir); err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "error storing raptor-q symbols", fields) - return nil, status.Errorf(codes.Internal, "error storing raptor-q symbols") + /* 9. Persist artefacts -------------------------------------------------------- */ + if err := task.storeArtefacts(ctx, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { + return nil, err } - logtrace.Info(ctx, "raptor-q symbols have been stored", fields) - return &RegisterResponse{ - Success: true, - Message: "successfully uploaded input data", - }, nil -} - -// extractSignatureAndFirstPart extracts the signature and first part from the encoded data -// data is expected to be in format: b64(JSON(Layout)).Signature -func extractSignatureAndFirstPart(data string) (encodedMetadata string, signature string, err error) { - parts := strings.Split(data, ".") - if len(parts) < 2 { - return "", "", errors.New("invalid data format") - } - - // The first part is the base64 encoded data - return parts[0], parts[1], nil -} - -func decodeMetadataFile(data string) (layout codec.Layout, err error) { - // Decode the base64 encoded data - decodedData, err := utils.B64Decode([]byte(data)) - if err != nil { - return layout, errors.Errorf("failed to decode data: %w", err) - } - - // Unmarshal the decoded data into a layout - if err := json.Unmarshal(decodedData, &layout); err != nil { - return layout, errors.Errorf("failed to unmarshal data: %w", err) - } - - return layout, nil -} - -func verifyIDs(ctx context.Context, ticketMetadata, metadata codec.Layout) error { - // Verify that the symbol identifiers match between versions - if err := utils.EqualStrList(ticketMetadata.Blocks[0].Symbols, metadata.Blocks[0].Symbols); err != nil { - return errors.Errorf("symbol identifiers don't match: %w", err) - } - - // Verify that the block hashes match - if ticketMetadata.Blocks[0].Hash != metadata.Blocks[0].Hash { - return errors.New("block hashes don't match") - } - - return nil -} - -// storeIDFiles stores ID files to P2P -func (task *CascadeRegistrationTask) storeIDFiles(ctx context.Context, metadataFiles [][]byte) error { - ctx = context.WithValue(ctx, log.TaskIDKey, task.ID()) - task.storage.TaskID = task.ID() - - // Log basic info before storing - logtrace.Info(ctx, "Storing ID files", logtrace.Fields{ - "taskID": task.ID(), - }) - - // Check if files exist - if len(metadataFiles) == 0 { - logtrace.Error(ctx, "No ID files to store", nil) - return errors.New("no ID files to store") - } - - // Store files with better error handling - if err := task.storage.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata); err != nil { - logtrace.Error(ctx, "Store operation failed", logtrace.Fields{ - "error": err.Error(), - "fileCount": len(metadataFiles), - }) - return errors.Errorf("store ID files into kademlia: %w", err) - } - - logtrace.Info(ctx, "ID files stored successfully", nil) - return nil -} - -// storeRaptorQSymbols stores RaptorQ symbols to P2P -func (task *CascadeRegistrationTask) storeRaptorQSymbols(ctx context.Context, symbolsDir string) error { - // Add improved logging - logtrace.Info(ctx, "Storing RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - }) - - err := task.storage.StoreRaptorQSymbolsIntoP2P(ctx, task.ID(), symbolsDir) - if err != nil { - logtrace.Error(ctx, "Failed to store RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - "error": err.Error(), - }) - } - return err + return &RegisterResponse{Success: true, Message: "successfully uploaded input data"}, nil }