From 0800dab5f69183c3c5c352ea16f2d215350bb0c8 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 12 May 2025 13:50:56 +0500 Subject: [PATCH] refactor and cleans up supernode code --- go.mod | 1 + pkg/logtrace/fields.go | 2 + .../server/cascade/cascade_action_server.go | 21 ++- supernode/services/cascade/adaptors/lumera.go | 56 ++++++ .../cascade/adaptors/mocks/lumera_mock.go | 112 ++++++++++++ .../cascade/adaptors/mocks/p2p_mock.go | 51 ++++++ .../cascade/adaptors/mocks/rq_mock.go | 51 ++++++ supernode/services/cascade/adaptors/p2p.go | 168 ++++++++++++++++++ supernode/services/cascade/adaptors/rq.go | 46 +++++ supernode/services/cascade/config.go | 5 - supernode/services/cascade/helper.go | 89 ++-------- supernode/services/cascade/helper_test.go | 3 +- supernode/services/cascade/interfaces.go | 17 ++ .../cascade/mocks/cascade_interfaces_mock.go | 87 +++++++++ supernode/services/cascade/register.go | 8 +- supernode/services/cascade/service.go | 25 ++- supernode/services/cascade/task.go | 1 - supernode/services/common/p2p.go | 10 -- 18 files changed, 635 insertions(+), 118 deletions(-) create mode 100644 supernode/services/cascade/adaptors/lumera.go create mode 100644 supernode/services/cascade/adaptors/mocks/lumera_mock.go create mode 100644 supernode/services/cascade/adaptors/mocks/p2p_mock.go create mode 100644 supernode/services/cascade/adaptors/mocks/rq_mock.go create mode 100644 supernode/services/cascade/adaptors/p2p.go create mode 100644 supernode/services/cascade/adaptors/rq.go create mode 100644 supernode/services/cascade/interfaces.go create mode 100644 supernode/services/cascade/mocks/cascade_interfaces_mock.go diff --git a/go.mod b/go.mod index 672c71ed..c8be9e1a 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/LumeraProtocol/supernode go 1.24.1 +replace "github.com/LumeraProtocol/supernode/supernode" => ./supernode require ( cosmossdk.io/math v1.5.3 github.com/LumeraProtocol/lumera v0.4.5 diff --git a/pkg/logtrace/fields.go b/pkg/logtrace/fields.go index c3f0e9fe..0ef9be21 100644 --- a/pkg/logtrace/fields.go +++ b/pkg/logtrace/fields.go @@ -16,4 +16,6 @@ const ( FieldRequest = "request" FieldStackTrace = "stack_trace" FieldTxHash = "tx_hash" + FieldTaskID = "task_id" + FieldActionID = "action_id" ) diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 8293a47f..4deca38b 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -11,21 +11,21 @@ import ( "google.golang.org/grpc" ) -type CascadeActionServer struct { +type ActionServer struct { pb.UnimplementedCascadeServiceServer - service *cascadeService.CascadeService + factory cascadeService.TaskFactory } -func NewCascadeActionServer(service *cascadeService.CascadeService) *CascadeActionServer { - return &CascadeActionServer{ - service: service, - } +// NewCascadeActionServer creates a new CascadeActionServer with injected service +func NewCascadeActionServer(factory cascadeService.TaskFactory) *ActionServer { + return &ActionServer{factory: factory} } -func (server *CascadeActionServer) Desc() *grpc.ServiceDesc { +func (server *ActionServer) Desc() *grpc.ServiceDesc { return &pb.CascadeService_ServiceDesc } -func (server *CascadeActionServer) Register(stream pb.CascadeService_RegisterServer) error { + +func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) error { fields := logtrace.Fields{ logtrace.FieldMethod: "Register", logtrace.FieldModule: "CascadeActionServer", @@ -77,9 +77,12 @@ func (server *CascadeActionServer) Register(stream pb.CascadeService_RegisterSer logtrace.Error(ctx, "no metadata received in stream", fields) return fmt.Errorf("no metadata received") } + fields[logtrace.FieldTaskID] = metadata.GetTaskId() + fields[logtrace.FieldActionID] = metadata.GetActionId() + logtrace.Info(ctx, "metadata received from action-sdk", fields) // Process the complete data - task := server.service.NewCascadeRegistrationTask() + task := server.factory.NewCascadeRegistrationTask() err := task.Register(ctx, &cascadeService.RegisterRequest{ TaskID: metadata.TaskId, ActionID: metadata.ActionId, diff --git a/supernode/services/cascade/adaptors/lumera.go b/supernode/services/cascade/adaptors/lumera.go new file mode 100644 index 00000000..fb79bae1 --- /dev/null +++ b/supernode/services/cascade/adaptors/lumera.go @@ -0,0 +1,56 @@ +package adaptors + +import ( + "context" + actiontypes "github.com/LumeraProtocol/lumera/x/action/types" + sntypes "github.com/LumeraProtocol/lumera/x/supernode/types" + "github.com/LumeraProtocol/supernode/pkg/lumera" + "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action_msg" +) + +//go:generate mockgen -destination=mocks/lumera_mock.go -package=cascadeadaptormocks -source=lumera.go + +// LumeraClient defines the interface for interacting with Lumera chain data during cascade registration. +type LumeraClient interface { + // SupernodeModule + GetTopSupernodes(ctx context.Context, height uint64) (*sntypes.QueryGetTopSuperNodesForBlockResponse, error) + + // Action Module + GetAction(ctx context.Context, actionID string) (*actiontypes.QueryGetActionResponse, error) + GetActionParams(ctx context.Context) (*actiontypes.QueryParamsResponse, error) + FinalizeAction(ctx context.Context, actionID string, rqids []string) (*action_msg.FinalizeActionResult, error) + + // Auth + Verify(ctx context.Context, creator string, file []byte, sigBytes []byte) error +} + +// Client is the concrete implementation used in production. +type Client struct { + lc lumera.Client +} + +func NewLumeraClient(client lumera.Client) LumeraClient { + return &Client{ + lc: client, + } +} + +func (c *Client) GetAction(ctx context.Context, actionID string) (*actiontypes.QueryGetActionResponse, error) { + return c.lc.Action().GetAction(ctx, actionID) +} + +func (c *Client) GetActionParams(ctx context.Context) (*actiontypes.QueryParamsResponse, error) { + return c.lc.Action().GetParams(ctx) +} + +func (c *Client) FinalizeAction(ctx context.Context, actionID string, rqids []string) (*action_msg.FinalizeActionResult, error) { + return c.lc.ActionMsg().FinalizeCascadeAction(ctx, actionID, rqids) +} + +func (c *Client) GetTopSupernodes(ctx context.Context, height uint64) (*sntypes.QueryGetTopSuperNodesForBlockResponse, error) { + return c.lc.SuperNode().GetTopSuperNodesForBlock(ctx, height) +} + +func (c *Client) Verify(ctx context.Context, creator string, file []byte, sigBytes []byte) error { + return c.lc.Auth().Verify(ctx, creator, file, sigBytes) +} diff --git a/supernode/services/cascade/adaptors/mocks/lumera_mock.go b/supernode/services/cascade/adaptors/mocks/lumera_mock.go new file mode 100644 index 00000000..67943f75 --- /dev/null +++ b/supernode/services/cascade/adaptors/mocks/lumera_mock.go @@ -0,0 +1,112 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: lumera.go + +// Package cascadeadaptormocks is a generated GoMock package. +package cascadeadaptormocks + +import ( + context "context" + reflect "reflect" + + types "github.com/LumeraProtocol/lumera/x/action/types" + types0 "github.com/LumeraProtocol/lumera/x/supernode/types" + action_msg "github.com/LumeraProtocol/supernode/pkg/lumera/modules/action_msg" + gomock "github.com/golang/mock/gomock" +) + +// MockLumeraClient is a mock of LumeraClient interface. +type MockLumeraClient struct { + ctrl *gomock.Controller + recorder *MockLumeraClientMockRecorder +} + +// MockLumeraClientMockRecorder is the mock recorder for MockLumeraClient. +type MockLumeraClientMockRecorder struct { + mock *MockLumeraClient +} + +// NewMockLumeraClient creates a new mock instance. +func NewMockLumeraClient(ctrl *gomock.Controller) *MockLumeraClient { + mock := &MockLumeraClient{ctrl: ctrl} + mock.recorder = &MockLumeraClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLumeraClient) EXPECT() *MockLumeraClientMockRecorder { + return m.recorder +} + +// FinalizeAction mocks base method. +func (m *MockLumeraClient) FinalizeAction(ctx context.Context, actionID string, rqids []string) (*action_msg.FinalizeActionResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FinalizeAction", ctx, actionID, rqids) + ret0, _ := ret[0].(*action_msg.FinalizeActionResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FinalizeAction indicates an expected call of FinalizeAction. +func (mr *MockLumeraClientMockRecorder) FinalizeAction(ctx, actionID, rqids interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeAction", reflect.TypeOf((*MockLumeraClient)(nil).FinalizeAction), ctx, actionID, rqids) +} + +// GetAction mocks base method. +func (m *MockLumeraClient) GetAction(ctx context.Context, actionID string) (*types.QueryGetActionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAction", ctx, actionID) + ret0, _ := ret[0].(*types.QueryGetActionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAction indicates an expected call of GetAction. +func (mr *MockLumeraClientMockRecorder) GetAction(ctx, actionID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAction", reflect.TypeOf((*MockLumeraClient)(nil).GetAction), ctx, actionID) +} + +// GetActionParams mocks base method. +func (m *MockLumeraClient) GetActionParams(ctx context.Context) (*types.QueryParamsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetActionParams", ctx) + ret0, _ := ret[0].(*types.QueryParamsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetActionParams indicates an expected call of GetActionParams. +func (mr *MockLumeraClientMockRecorder) GetActionParams(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActionParams", reflect.TypeOf((*MockLumeraClient)(nil).GetActionParams), ctx) +} + +// GetTopSupernodes mocks base method. +func (m *MockLumeraClient) GetTopSupernodes(ctx context.Context, height uint64) (*types0.QueryGetTopSuperNodesForBlockResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopSupernodes", ctx, height) + ret0, _ := ret[0].(*types0.QueryGetTopSuperNodesForBlockResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTopSupernodes indicates an expected call of GetTopSupernodes. +func (mr *MockLumeraClientMockRecorder) GetTopSupernodes(ctx, height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopSupernodes", reflect.TypeOf((*MockLumeraClient)(nil).GetTopSupernodes), ctx, height) +} + +// Verify mocks base method. +func (m *MockLumeraClient) Verify(ctx context.Context, creator string, file, sigBytes []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Verify", ctx, creator, file, sigBytes) + ret0, _ := ret[0].(error) + return ret0 +} + +// Verify indicates an expected call of Verify. +func (mr *MockLumeraClientMockRecorder) Verify(ctx, creator, file, sigBytes interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Verify", reflect.TypeOf((*MockLumeraClient)(nil).Verify), ctx, creator, file, sigBytes) +} diff --git a/supernode/services/cascade/adaptors/mocks/p2p_mock.go b/supernode/services/cascade/adaptors/mocks/p2p_mock.go new file mode 100644 index 00000000..344647ee --- /dev/null +++ b/supernode/services/cascade/adaptors/mocks/p2p_mock.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: p2p.go + +// Package cascadeadaptormocks is a generated GoMock package. +package cascadeadaptormocks + +import ( + context "context" + reflect "reflect" + + logtrace "github.com/LumeraProtocol/supernode/pkg/logtrace" + adaptors "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" + gomock "github.com/golang/mock/gomock" +) + +// MockP2PService is a mock of P2PService interface. +type MockP2PService struct { + ctrl *gomock.Controller + recorder *MockP2PServiceMockRecorder +} + +// MockP2PServiceMockRecorder is the mock recorder for MockP2PService. +type MockP2PServiceMockRecorder struct { + mock *MockP2PService +} + +// NewMockP2PService creates a new mock instance. +func NewMockP2PService(ctrl *gomock.Controller) *MockP2PService { + mock := &MockP2PService{ctrl: ctrl} + mock.recorder = &MockP2PServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockP2PService) EXPECT() *MockP2PServiceMockRecorder { + return m.recorder +} + +// StoreArtefacts mocks base method. +func (m *MockP2PService) StoreArtefacts(ctx context.Context, req adaptors.StoreArtefactsRequest, f logtrace.Fields) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreArtefacts", ctx, req, f) + ret0, _ := ret[0].(error) + return ret0 +} + +// StoreArtefacts indicates an expected call of StoreArtefacts. +func (mr *MockP2PServiceMockRecorder) StoreArtefacts(ctx, req, f interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreArtefacts", reflect.TypeOf((*MockP2PService)(nil).StoreArtefacts), ctx, req, f) +} diff --git a/supernode/services/cascade/adaptors/mocks/rq_mock.go b/supernode/services/cascade/adaptors/mocks/rq_mock.go new file mode 100644 index 00000000..306f66f7 --- /dev/null +++ b/supernode/services/cascade/adaptors/mocks/rq_mock.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: rq.go + +// Package cascadeadaptormocks is a generated GoMock package. +package cascadeadaptormocks + +import ( + context "context" + reflect "reflect" + + adaptors "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" + gomock "github.com/golang/mock/gomock" +) + +// MockCodecService is a mock of CodecService interface. +type MockCodecService struct { + ctrl *gomock.Controller + recorder *MockCodecServiceMockRecorder +} + +// MockCodecServiceMockRecorder is the mock recorder for MockCodecService. +type MockCodecServiceMockRecorder struct { + mock *MockCodecService +} + +// NewMockCodecService creates a new mock instance. +func NewMockCodecService(ctrl *gomock.Controller) *MockCodecService { + mock := &MockCodecService{ctrl: ctrl} + mock.recorder = &MockCodecServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCodecService) EXPECT() *MockCodecServiceMockRecorder { + return m.recorder +} + +// EncodeInput mocks base method. +func (m *MockCodecService) EncodeInput(ctx context.Context, taskID string, data []byte) (adaptors.EncodeResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EncodeInput", ctx, taskID, data) + ret0, _ := ret[0].(adaptors.EncodeResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// EncodeInput indicates an expected call of EncodeInput. +func (mr *MockCodecServiceMockRecorder) EncodeInput(ctx, taskID, data interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EncodeInput", reflect.TypeOf((*MockCodecService)(nil).EncodeInput), ctx, taskID, data) +} diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go new file mode 100644 index 00000000..cd76cfe0 --- /dev/null +++ b/supernode/services/cascade/adaptors/p2p.go @@ -0,0 +1,168 @@ +package adaptors + +import ( + "context" + "fmt" + "io/fs" + "math" + "math/rand/v2" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/LumeraProtocol/supernode/p2p" + "github.com/LumeraProtocol/supernode/pkg/log" + "github.com/LumeraProtocol/supernode/pkg/logtrace" + "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" + "github.com/LumeraProtocol/supernode/pkg/utils" + "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/pkg/errors" +) + +const ( + loadSymbolsBatchSize = 2500 + storeSymbolsPercent = 10 +) + +// P2PService defines the interface for storing data in the P2P layer. +// +//go:generate mockgen -destination=mocks/p2p_mock.go -package=cascadeadaptormocks -source=p2p.go +type P2PService interface { + StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error +} + +// p2pImpl is the default implementation of the P2PService interface. +type p2pImpl struct { + p2p p2p.Client + rqStore rqstore.Store +} + +// NewP2PService returns a concrete implementation of P2PService. +func NewP2PService(client p2p.Client, store rqstore.Store) P2PService { + return &p2pImpl{p2p: client, rqStore: store} +} + +type StoreArtefactsRequest struct { + TaskID string + ActionID string + IDFiles [][]byte + SymbolsDir string +} + +func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error { + logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": req.TaskID, "fileCount": len(req.IDFiles)}) + + if err := p.storeCascadeMetadata(ctx, req.IDFiles, req.TaskID); err != nil { + return errors.Wrap(err, "failed to store ID files") + } + logtrace.Info(ctx, "id files have been stored", f) + + if err := p.storeCascadeSymbols(ctx, req.TaskID, req.ActionID, req.SymbolsDir); err != nil { + return errors.Wrap(err, "error storing raptor-q symbols") + } + logtrace.Info(ctx, "raptor-q symbols have been stored", f) + + return nil +} + +func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]byte, taskID string) error { + logtrace.Info(ctx, "Storing cascade metadata", logtrace.Fields{ + "taskID": taskID, + "fileCount": len(metadataFiles), + }) + + return p.p2p.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata, taskID) +} + +func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) error { + /* record directory in DB */ + if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil { + return fmt.Errorf("store symbol dir: %w", err) + } + + /* gather every symbol path under symbolsDir ------------------------- */ + keys, err := walkSymbolTree(symbolsDir) + if err != nil { + return err + } + + /* down-sample if we exceed the “big directory” threshold ------------- */ + if len(keys) > loadSymbolsBatchSize { + want := int(math.Ceil(float64(len(keys)) * storeSymbolsPercent / 100)) + if want < len(keys) { + rand.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) + keys = keys[:want] + } + sort.Strings(keys) // deterministic order inside the sample + } + + log.WithContext(ctx).WithField("count", len(keys)).Info("storing RaptorQ symbols") + + /* stream in fixed-size batches -------------------------------------- */ + for start := 0; start < len(keys); { + end := start + loadSymbolsBatchSize + if end > len(keys) { + end = len(keys) + } + if err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, keys[start:end]); err != nil { + return err + } + start = end + } + + if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil { + return fmt.Errorf("update first-batch flag: %w", err) + } + log.WithContext(ctx).WithField("curr-time", time.Now().UTC()).WithField("count", len(keys)). + Info("finished storing RaptorQ symbols") + + return nil +} + +func walkSymbolTree(root string) ([]string, error) { + var keys []string + err := filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err // propagate I/O errors + } + if d.IsDir() { + return nil // skip directory nodes + } + // ignore layout json if present + if strings.EqualFold(filepath.Ext(d.Name()), ".json") { + return nil + } + rel, err := filepath.Rel(root, path) + if err != nil { + return err + } + keys = append(keys, rel) // store as "block_0/filename" + return nil + }) + if err != nil { + return nil, fmt.Errorf("walk symbol tree: %w", err) + } + return keys, nil +} + +func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) error { + log.WithContext(ctx).WithField("count", len(fileKeys)).Info("loading batch symbols") + + symbols, err := utils.LoadSymbols(root, fileKeys) + if err != nil { + return fmt.Errorf("load symbols: %w", err) + } + + if err := c.p2p.StoreBatch(ctx, symbols, common.P2PDataRaptorQSymbol, taskID); err != nil { + return fmt.Errorf("p2p store batch: %w", err) + } + log.WithContext(ctx).WithField("count", len(symbols)).Info("stored batch symbols") + + if err := utils.DeleteSymbols(ctx, root, fileKeys); err != nil { + return fmt.Errorf("delete symbols: %w", err) + } + log.WithContext(ctx).WithField("count", len(symbols)).Info("deleted batch symbols") + + return nil +} diff --git a/supernode/services/cascade/adaptors/rq.go b/supernode/services/cascade/adaptors/rq.go new file mode 100644 index 00000000..ed0d4f2d --- /dev/null +++ b/supernode/services/cascade/adaptors/rq.go @@ -0,0 +1,46 @@ +package adaptors + +import ( + "context" + + "github.com/LumeraProtocol/supernode/pkg/codec" +) + +// CodecService defines the interface for RaptorQ encoding of input data. +// +//go:generate mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go +type CodecService interface { + EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error) +} + +// EncodeResult represents the outcome of encoding the input data. +type EncodeResult struct { + SymbolsDir string + Metadata codec.Layout +} + +// codecImpl is the default implementation using the real codec service. +type codecImpl struct { + codec codec.Codec +} + +// NewCodecService creates a new production instance of CodecService. +func NewCodecService(codec codec.Codec) CodecService { + return &codecImpl{codec: codec} +} + +// EncodeInput encodes the provided data and returns symbols and metadata. +func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error) { + resp, err := c.codec.Encode(ctx, codec.EncodeRequest{ + TaskID: taskID, + Data: data, + }) + if err != nil { + return EncodeResult{}, err + } + + return EncodeResult{ + SymbolsDir: resp.SymbolsDir, + Metadata: resp.Metadata, + }, nil +} diff --git a/supernode/services/cascade/config.go b/supernode/services/cascade/config.go index ac91558c..c4e182ac 100644 --- a/supernode/services/cascade/config.go +++ b/supernode/services/cascade/config.go @@ -11,8 +11,3 @@ type Config struct { RaptorQServiceAddress string `mapstructure:"-" json:"-"` RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"` } - -// NewConfig returns a new Config instance with default values -func NewConfig() *Config { - return &Config{} -} diff --git a/supernode/services/cascade/helper.go b/supernode/services/cascade/helper.go index 9d04d623..07b29739 100644 --- a/supernode/services/cascade/helper.go +++ b/supernode/services/cascade/helper.go @@ -4,28 +4,26 @@ import ( "context" "encoding/base64" "fmt" - "strings" "cosmossdk.io/math" + actiontypes "github.com/LumeraProtocol/lumera/x/action/types" "github.com/LumeraProtocol/supernode/pkg/codec" "github.com/LumeraProtocol/supernode/pkg/errors" - "github.com/LumeraProtocol/supernode/pkg/log" "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/pkg/lumera/modules/supernode" "github.com/LumeraProtocol/supernode/pkg/utils" - "github.com/LumeraProtocol/supernode/supernode/services/common" + "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/golang/protobuf/proto" json "github.com/json-iterator/go" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - - actiontypes "github.com/LumeraProtocol/lumera/x/action/types" ) func (task *CascadeRegistrationTask) fetchAction(ctx context.Context, actionID string, f logtrace.Fields) (*actiontypes.Action, error) { - res, err := task.lumeraClient.Action().GetAction(ctx, actionID) + res, err := task.lumeraClient.GetAction(ctx, actionID) if err != nil { return nil, task.wrapErr(ctx, "failed to get action", err, f) } @@ -39,7 +37,7 @@ func (task *CascadeRegistrationTask) fetchAction(ctx context.Context, actionID s } func (task *CascadeRegistrationTask) ensureIsTopSupernode(ctx context.Context, blockHeight uint64, f logtrace.Fields) error { - top, err := task.lumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) + top, err := task.lumeraClient.GetTopSupernodes(ctx, blockHeight) if err != nil { return task.wrapErr(ctx, "failed to get top SNs", err, f) } @@ -81,8 +79,8 @@ func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data [] return nil } -func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, data []byte, f logtrace.Fields) (*codec.EncodeResponse, error) { - resp, err := task.codec.Encode(ctx, codec.EncodeRequest{Data: data, TaskID: task.ID()}) +func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, data []byte, f logtrace.Fields) (*adaptors.EncodeResult, error) { + resp, err := task.rq.EncodeInput(ctx, task.ID(), data) if err != nil { return nil, task.wrapErr(ctx, "failed to encode data", err, f) } @@ -111,7 +109,7 @@ func (task *CascadeRegistrationTask) verifySignatureAndDecodeLayout(ctx context. }) // Pass the decoded signature bytes for verification - if err := task.lumeraClient.Auth().Verify(ctx, creator, []byte(file), sigBytes); err != nil { + if err := task.lumeraClient.Verify(ctx, creator, []byte(file), sigBytes); err != nil { return codec.Layout{}, "", task.wrapErr(ctx, "failed to verify node creator signature", err, f) } @@ -142,20 +140,13 @@ func (task *CascadeRegistrationTask) generateRQIDFiles(ctx context.Context, meta return res, nil } -func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, idFiles [][]byte, symbolsDir string, f logtrace.Fields) error { - logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": task.ID(), "fileCount": len(idFiles)}) - - if err := task.storeIDFiles(ctx, idFiles); err != nil { - return task.wrapErr(ctx, "failed to store ID files", err, f) - } - logtrace.Info(ctx, "id files have been stored", f) - - if err := task.storeRaptorQSymbols(ctx, symbolsDir); err != nil { - return task.wrapErr(ctx, "error storing raptor-q symbols", err, f) - } - logtrace.Info(ctx, "raptor-q symbols have been stored", f) - - return nil +func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionID string, idFiles [][]byte, symbolsDir string, f logtrace.Fields) error { + return task.p2p.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{ + IDFiles: idFiles, + SymbolsDir: symbolsDir, + TaskID: task.ID(), + ActionID: actionID, + }, f) } func (task *CascadeRegistrationTask) wrapErr(ctx context.Context, msg string, err error, f logtrace.Fields) error { @@ -194,7 +185,7 @@ func decodeMetadataFile(data string) (layout codec.Layout, err error) { return layout, nil } -func verifyIDs(ctx context.Context, ticketMetadata, metadata codec.Layout) error { +func verifyIDs(ticketMetadata, metadata codec.Layout) error { // Verify that the symbol identifiers match between versions if err := utils.EqualStrList(ticketMetadata.Blocks[0].Symbols, metadata.Blocks[0].Symbols); err != nil { return errors.Errorf("symbol identifiers don't match: %w", err) @@ -208,57 +199,11 @@ func verifyIDs(ctx context.Context, ticketMetadata, metadata codec.Layout) error return nil } -// storeIDFiles stores ID files to P2P -func (task *CascadeRegistrationTask) storeIDFiles(ctx context.Context, metadataFiles [][]byte) error { - ctx = context.WithValue(ctx, log.TaskIDKey, task.ID()) - task.storage.TaskID = task.ID() - - // Log basic info before storing - logtrace.Info(ctx, "Storing ID files", logtrace.Fields{ - "taskID": task.ID(), - }) - - // Check if files exist - if len(metadataFiles) == 0 { - logtrace.Error(ctx, "No ID files to store", nil) - return errors.New("no ID files to store") - } - - // Store files with better error handling - if err := task.storage.StoreBatch(ctx, metadataFiles, common.P2PDataCascadeMetadata); err != nil { - logtrace.Error(ctx, "Store operation failed", logtrace.Fields{ - "error": err.Error(), - "fileCount": len(metadataFiles), - }) - return errors.Errorf("store ID files into kademlia: %w", err) - } - - logtrace.Info(ctx, "ID files stored successfully", nil) - return nil -} - -// storeRaptorQSymbols stores RaptorQ symbols to P2P -func (task *CascadeRegistrationTask) storeRaptorQSymbols(ctx context.Context, symbolsDir string) error { - // Add improved logging - logtrace.Info(ctx, "Storing RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - }) - - err := task.storage.StoreRaptorQSymbolsIntoP2P(ctx, task.ID(), symbolsDir) - if err != nil { - logtrace.Error(ctx, "Failed to store RaptorQ symbols", logtrace.Fields{ - "taskID": task.ID(), - "error": err.Error(), - }) - } - return err -} - // verifyActionFee checks if the action fee is sufficient for the given data size // It fetches action parameters, calculates the required fee, and compares it with the action price func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, data []byte, fields logtrace.Fields) error { // Fetch action parameters - params, err := task.lumeraClient.Action().GetParams(ctx) + params, err := task.lumeraClient.GetActionParams(ctx) if err != nil { return task.wrapErr(ctx, "failed to get action parameters", err, fields) } diff --git a/supernode/services/cascade/helper_test.go b/supernode/services/cascade/helper_test.go index 2f17f89a..1f93fac7 100644 --- a/supernode/services/cascade/helper_test.go +++ b/supernode/services/cascade/helper_test.go @@ -1,7 +1,6 @@ package cascade import ( - "context" "encoding/json" "testing" @@ -107,7 +106,7 @@ func Test_verifyIDs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := verifyIDs(context.Background(), tt.ticket, tt.metadata) + err := verifyIDs(tt.ticket, tt.metadata) if tt.expectErr != "" { assert.ErrorContains(t, err, tt.expectErr) } else { diff --git a/supernode/services/cascade/interfaces.go b/supernode/services/cascade/interfaces.go new file mode 100644 index 00000000..930695f4 --- /dev/null +++ b/supernode/services/cascade/interfaces.go @@ -0,0 +1,17 @@ +package cascade + +import ( + "context" +) + +// TaskFactory defines an interface to create a new cascade registration task +// +//go:generate mockgen -destination=mocks/cascade_interfaces_mock.go -package=cascademocks -source=interfaces.go +type TaskFactory interface { + NewCascadeRegistrationTask() RegistrationTaskService +} + +// RegistrationTaskService interface allows to register a new cascade +type RegistrationTaskService interface { + Register(ctx context.Context, req *RegisterRequest, send func(resp *RegisterResponse) error) error +} diff --git a/supernode/services/cascade/mocks/cascade_interfaces_mock.go b/supernode/services/cascade/mocks/cascade_interfaces_mock.go new file mode 100644 index 00000000..3b0bb625 --- /dev/null +++ b/supernode/services/cascade/mocks/cascade_interfaces_mock.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: interfaces.go + +// Package cascademocks is a generated GoMock package. +package cascademocks + +import ( + context "context" + reflect "reflect" + + cascade "github.com/LumeraProtocol/supernode/supernode/services/cascade" + gomock "github.com/golang/mock/gomock" +) + +// MockTaskFactory is a mock of TaskFactory interface. +type MockTaskFactory struct { + ctrl *gomock.Controller + recorder *MockTaskFactoryMockRecorder +} + +// MockTaskFactoryMockRecorder is the mock recorder for MockTaskFactory. +type MockTaskFactoryMockRecorder struct { + mock *MockTaskFactory +} + +// NewMockTaskFactory creates a new mock instance. +func NewMockTaskFactory(ctrl *gomock.Controller) *MockTaskFactory { + mock := &MockTaskFactory{ctrl: ctrl} + mock.recorder = &MockTaskFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskFactory) EXPECT() *MockTaskFactoryMockRecorder { + return m.recorder +} + +// NewCascadeRegistrationTask mocks base method. +func (m *MockTaskFactory) NewCascadeRegistrationTask() cascade.RegistrationTaskService { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewCascadeRegistrationTask") + ret0, _ := ret[0].(cascade.RegistrationTaskService) + return ret0 +} + +// NewCascadeRegistrationTask indicates an expected call of NewCascadeRegistrationTask. +func (mr *MockTaskFactoryMockRecorder) NewCascadeRegistrationTask() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewCascadeRegistrationTask", reflect.TypeOf((*MockTaskFactory)(nil).NewCascadeRegistrationTask)) +} + +// MockRegistrationTaskService is a mock of RegistrationTaskService interface. +type MockRegistrationTaskService struct { + ctrl *gomock.Controller + recorder *MockRegistrationTaskServiceMockRecorder +} + +// MockRegistrationTaskServiceMockRecorder is the mock recorder for MockRegistrationTaskService. +type MockRegistrationTaskServiceMockRecorder struct { + mock *MockRegistrationTaskService +} + +// NewMockRegistrationTaskService creates a new mock instance. +func NewMockRegistrationTaskService(ctrl *gomock.Controller) *MockRegistrationTaskService { + mock := &MockRegistrationTaskService{ctrl: ctrl} + mock.recorder = &MockRegistrationTaskServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRegistrationTaskService) EXPECT() *MockRegistrationTaskServiceMockRecorder { + return m.recorder +} + +// Register mocks base method. +func (m *MockRegistrationTaskService) Register(ctx context.Context, req *cascade.RegisterRequest, send func(*cascade.RegisterResponse) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Register", ctx, req, send) + ret0, _ := ret[0].(error) + return ret0 +} + +// Register indicates an expected call of Register. +func (mr *MockRegistrationTaskServiceMockRecorder) Register(ctx, req, send interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockRegistrationTaskService)(nil).Register), ctx, req, send) +} diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index c044f3bc..705fff9e 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -113,20 +113,20 @@ func (task *CascadeRegistrationTask) Register( task.streamEvent(SupernodeEventTypeRQIDsGenerated, "rq-id files have been generated", "", send) /* 9. Consistency checks ------------------------------------------------------- */ - if err := verifyIDs(ctx, layout, encResp.Metadata); err != nil { - return err + if err := verifyIDs(layout, encResp.Metadata); err != nil { + return task.wrapErr(ctx, "failed to verify IDs", err, fields) } logtrace.Info(ctx, "rq-ids have been verified", fields) task.streamEvent(SupernodeEventTypeRqIDsVerified, "rq-ids have been verified", "", send) /* 10. Persist artefacts -------------------------------------------------------- */ - if err := task.storeArtefacts(ctx, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { + if err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { return err } logtrace.Info(ctx, "artefacts have been stored", fields) task.streamEvent(SupernodeEventTypeArtefactsStored, "artefacts have been stored", "", send) - resp, err := task.lumeraClient.ActionMsg().FinalizeCascadeAction(ctx, action.ActionID, rqidResp.RQIDs) + resp, err := task.lumeraClient.FinalizeAction(ctx, action.ActionID, rqidResp.RQIDs) if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Info(ctx, "Finalize Action Error", fields) diff --git a/supernode/services/cascade/service.go b/supernode/services/cascade/service.go index e6cdf147..ec8f6373 100644 --- a/supernode/services/cascade/service.go +++ b/supernode/services/cascade/service.go @@ -6,8 +6,8 @@ import ( "github.com/LumeraProtocol/supernode/p2p" "github.com/LumeraProtocol/supernode/pkg/codec" "github.com/LumeraProtocol/supernode/pkg/lumera" - "github.com/LumeraProtocol/supernode/pkg/storage/rqstore" + "github.com/LumeraProtocol/supernode/supernode/services/cascade/adaptors" "github.com/LumeraProtocol/supernode/supernode/services/common" ) @@ -15,15 +15,15 @@ type CascadeService struct { *common.SuperNodeService config *Config - lumeraClient lumera.Client - rqstore rqstore.Store - codec codec.Codec + lumeraClient adaptors.LumeraClient + p2p adaptors.P2PService + rq adaptors.CodecService } // NewCascadeRegistrationTask creates a new task for cascade registration -func (s *CascadeService) NewCascadeRegistrationTask() *CascadeRegistrationTask { - task := NewCascadeRegistrationTask(s) - s.Worker.AddTask(task) +func (service *CascadeService) NewCascadeRegistrationTask() RegistrationTaskService { + task := NewCascadeRegistrationTask(service) + service.Worker.AddTask(task) return task } @@ -37,13 +37,8 @@ func NewCascadeService(config *Config, lumera lumera.Client, p2pClient p2p.Clien return &CascadeService{ config: config, SuperNodeService: common.NewSuperNodeService(p2pClient), - lumeraClient: lumera, - codec: codec, - rqstore: rqstore, + lumeraClient: adaptors.NewLumeraClient(lumera), + p2p: adaptors.NewP2PService(p2pClient, rqstore), + rq: adaptors.NewCodecService(codec), } } - -// GetSNAddress returns the supernode account address -func (s *CascadeService) GetSNAddress() string { - return s.config.SupernodeAccountAddress -} diff --git a/supernode/services/cascade/task.go b/supernode/services/cascade/task.go index 2cfa5e09..92096943 100644 --- a/supernode/services/cascade/task.go +++ b/supernode/services/cascade/task.go @@ -37,7 +37,6 @@ func NewCascadeRegistrationTask(service *CascadeService) *CascadeRegistrationTas task := &CascadeRegistrationTask{ SuperNodeTask: common.NewSuperNodeTask(logPrefix), CascadeService: service, - storage: common.NewStorageHandler(service.P2PClient, service.config.RqFilesDir, service.rqstore), } return task diff --git a/supernode/services/common/p2p.go b/supernode/services/common/p2p.go index a477a591..535f9298 100644 --- a/supernode/services/common/p2p.go +++ b/supernode/services/common/p2p.go @@ -8,14 +8,4 @@ const ( P2PDataRaptorQSymbol // 1 // P2PDataCascadeMetadata cascade ID file P2PDataCascadeMetadata // 2 - // P2PDataDDMetadata dd fp metadata file - P2PDataDDMetadata // 3 - // P2PPreviewThumbnail preview NFT thumbnail - P2PPreviewThumbnail // 4 - // P2PMediumThumbnail NFT medium thumbnail - P2PMediumThumbnail // 5 - // P2PSmallThumbnail small NFT thumbnail - P2PSmallThumbnail // 6 - // P2PDebug debug - P2PDebug // 7 )