diff --git a/gen/supernode/action/cascade/service.pb.go b/gen/supernode/action/cascade/service.pb.go index 64207cd9..d084e367 100644 --- a/gen/supernode/action/cascade/service.pb.go +++ b/gen/supernode/action/cascade/service.pb.go @@ -35,7 +35,7 @@ const ( SupernodeEventType_RQID_VERIFIED SupernodeEventType = 9 SupernodeEventType_ARTEFACTS_STORED SupernodeEventType = 10 SupernodeEventType_ACTION_FINALIZED SupernodeEventType = 11 - SupernodeEventType_Artefacts_Downloaded SupernodeEventType = 12 + SupernodeEventType_ARTEFACTS_DOWNLOADED SupernodeEventType = 12 ) // Enum value maps for SupernodeEventType. @@ -53,7 +53,7 @@ var ( 9: "RQID_VERIFIED", 10: "ARTEFACTS_STORED", 11: "ACTION_FINALIZED", - 12: "Artefacts_Downloaded", + 12: "ARTEFACTS_DOWNLOADED", } SupernodeEventType_value = map[string]int32{ "UNKNOWN": 0, @@ -68,7 +68,7 @@ var ( "RQID_VERIFIED": 9, "ARTEFACTS_STORED": 10, "ACTION_FINALIZED": 11, - "Artefacts_Downloaded": 12, + "ARTEFACTS_DOWNLOADED": 12, } ) @@ -821,8 +821,8 @@ var file_supernode_action_cascade_service_proto_rawDesc = []byte{ 0x52, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, 0x53, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x0a, 0x12, 0x14, 0x0a, 0x10, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, - 0x45, 0x44, 0x10, 0x0b, 0x12, 0x18, 0x0a, 0x14, 0x41, 0x72, 0x74, 0x65, 0x66, 0x61, 0x63, 0x74, - 0x73, 0x5f, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x65, 0x64, 0x10, 0x0c, 0x32, 0xe2, + 0x45, 0x44, 0x10, 0x0b, 0x12, 0x18, 0x0a, 0x14, 0x41, 0x52, 0x54, 0x45, 0x46, 0x41, 0x43, 0x54, + 0x53, 0x5f, 0x44, 0x4f, 0x57, 0x4e, 0x4c, 0x4f, 0x41, 0x44, 0x45, 0x44, 0x10, 0x0c, 0x32, 0xe2, 0x01, 0x0a, 0x0e, 0x43, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x63, 0x61, 0x73, 0x63, 0x61, 0x64, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, diff --git a/pkg/codec/decode.go b/pkg/codec/decode.go index 7e36db8c..9d82f0fd 100644 --- a/pkg/codec/decode.go +++ b/pkg/codec/decode.go @@ -2,6 +2,7 @@ package codec import ( "context" + "encoding/json" "fmt" "os" "path/filepath" @@ -17,8 +18,8 @@ type DecodeRequest struct { } type DecodeResponse struct { - Path string - LayoutPath string + Path string + DecodeTmpDir string } func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) { @@ -52,14 +53,27 @@ func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeRespons } logtrace.Info(ctx, "symbols written to disk", fields) + // ---------- write layout.json ---------- ←★ + layoutPath := filepath.Join(symbolsDir, "layout.json") + layoutBytes, err := json.Marshal(req.Layout) + if err != nil { + fields[logtrace.FieldError] = err.Error() + return DecodeResponse{}, fmt.Errorf("marshal layout: %w", err) + } + if err := os.WriteFile(layoutPath, layoutBytes, 0o644); err != nil { + fields[logtrace.FieldError] = err.Error() + return DecodeResponse{}, fmt.Errorf("write layout file: %w", err) + } + logtrace.Info(ctx, "layout.json written", fields) + // Decode outputPath := filepath.Join(symbolsDir, "output") - if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil { + if err := processor.DecodeSymbols(symbolsDir, outputPath, layoutPath); err != nil { fields[logtrace.FieldError] = err.Error() _ = os.Remove(outputPath) return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err) } logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields) - return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil + return DecodeResponse{Path: outputPath, DecodeTmpDir: symbolsDir}, nil } diff --git a/proto/supernode/action/cascade/service.proto b/proto/supernode/action/cascade/service.proto index f4afd5be..23be1d35 100644 --- a/proto/supernode/action/cascade/service.proto +++ b/proto/supernode/action/cascade/service.proto @@ -59,7 +59,7 @@ enum SupernodeEventType { RQID_VERIFIED = 9; ARTEFACTS_STORED = 10; ACTION_FINALIZED = 11; - Artefacts_Downloaded = 12; + ARTEFACTS_DOWNLOADED = 12; } message HealthCheckRequest {} diff --git a/sdk/action/client.go b/sdk/action/client.go index ed4dabb6..ca19ec57 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -25,6 +25,7 @@ type Client interface { GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error SubscribeToAllEvents(ctx context.Context, handler event.Handler) error + DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error) } // ClientImpl implements the Client interface @@ -128,3 +129,25 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han return nil } + +func (c *ClientImpl) DownloadCascade( + ctx context.Context, + actionID, outputPath string, +) (string, error) { + + if actionID == "" { + return "", fmt.Errorf("actionID is empty") + } + + taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath) + if err != nil { + return "", fmt.Errorf("create download task: %w", err) + } + + c.logger.Info(ctx, "cascade download task created", + "task_id", taskID, + "action_id", actionID, + ) + + return taskID, nil +} diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index e3405413..7b4cc7a4 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -164,6 +164,98 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca }, nil } +// CascadeSupernodeDownload downloads a file from a supernode gRPC stream +func (a *cascadeAdapter) CascadeSupernodeDownload( + ctx context.Context, + in *CascadeSupernodeDownloadRequest, + opts ...grpc.CallOption, +) (*CascadeSupernodeDownloadResponse, error) { + + ctx = net.AddCorrelationID(ctx) + + // 1. Open gRPC stream (server-stream) + stream, err := a.client.Download(ctx, &cascade.DownloadRequest{ + ActionId: in.ActionID, + }, opts...) + if err != nil { + a.logger.Error(ctx, "failed to create download stream", + "action_id", in.ActionID, "error", err) + return nil, err + } + + // 2. Prepare destination file + outFile, err := os.Create(in.OutputPath) + if err != nil { + a.logger.Error(ctx, "failed to create output file", + "path", in.OutputPath, "error", err) + return nil, fmt.Errorf("create output file: %w", err) + } + defer outFile.Close() + + var ( + bytesWritten int64 + chunkIndex int + ) + + // 3. Receive streamed responses + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("stream recv: %w", err) + } + + switch x := resp.ResponseType.(type) { + + // 3a. Progress / event message + case *cascade.DownloadResponse_Event: + a.logger.Info(ctx, "supernode event", + "event_type", x.Event.EventType, + "message", x.Event.Message, + "action_id", in.ActionID) + + if in.EventLogger != nil { + in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{ + event.KeyActionID: in.ActionID, + event.KeyEventType: x.Event.EventType, + event.KeyMessage: x.Event.Message, + }) + } + + // 3b. Actual data chunk + case *cascade.DownloadResponse_Chunk: + data := x.Chunk.Data + if len(data) == 0 { + continue + } + if _, err := outFile.Write(data); err != nil { + return nil, fmt.Errorf("write chunk: %w", err) + } + + bytesWritten += int64(len(data)) + chunkIndex++ + + a.logger.Debug(ctx, "received chunk", + "chunk_index", chunkIndex, + "chunk_size", len(data), + "bytes_written", bytesWritten) + } + } + + a.logger.Info(ctx, "download complete", + "bytes_written", bytesWritten, + "path", in.OutputPath, + "action_id", in.ActionID) + + return &CascadeSupernodeDownloadResponse{ + Success: true, + Message: "artefact downloaded", + OutputPath: in.OutputPath, + }, nil +} + // toSdkEvent converts a supernode-side enum value into an internal SDK EventType. func toSdkEvent(e cascade.SupernodeEventType) event.EventType { switch e { @@ -189,6 +281,8 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType { return event.SupernodeArtefactsStored case cascade.SupernodeEventType_ACTION_FINALIZED: return event.SupernodeActionFinalized + case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED: + return event.SupernodeArtefactsDownloaded default: return event.SupernodeUnknown } diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index e110d1d4..f78decf9 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -28,7 +28,21 @@ type CascadeSupernodeRegisterResponse struct { TxHash string } +type CascadeSupernodeDownloadRequest struct { + ActionID string + TaskID string + OutputPath string + EventLogger LoggerFunc +} + +type CascadeSupernodeDownloadResponse struct { + Success bool + Message string + OutputPath string +} + //go:generate mockery --name=CascadeServiceClient --output=testutil/mocks --outpkg=mocks --filename=cascade_service_mock.go type CascadeServiceClient interface { CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error) + CascadeSupernodeDownload(ctx context.Context, in *CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*CascadeSupernodeDownloadResponse, error) } diff --git a/sdk/event/keys.go b/sdk/event/keys.go index 87895280..728c4276 100644 --- a/sdk/event/keys.go +++ b/sdk/event/keys.go @@ -14,6 +14,7 @@ const ( KeyMessage EventDataKey = "message" KeyProgress EventDataKey = "progress" KeyEventType EventDataKey = "event_type" + KeyOutputPath EventDataKey = "output_path" // Task specific keys KeyTaskID EventDataKey = "task_id" diff --git a/sdk/event/types.go b/sdk/event/types.go index ce782617..ab1010d2 100644 --- a/sdk/event/types.go +++ b/sdk/event/types.go @@ -23,21 +23,27 @@ const ( SDKTaskTxHashReceived EventType = "sdk:txhash_received" SDKTaskCompleted EventType = "sdk:completed" SDKTaskFailed EventType = "sdk:failed" + + SDKDownloadAttempt EventType = "sdk:download_attempt" + SDKDownloadFailure EventType = "sdk:download_failure" + SDKOutputPathReceived EventType = "sdk:output_path_received" + SDKDownloadSuccessful EventType = "sdk:download_successful" ) const ( - SupernodeActionRetrieved EventType = "supernode:action_retrieved" - SupernodeActionFeeVerified EventType = "supernode:action_fee_verified" - SupernodeTopCheckPassed EventType = "supernode:top_check_passed" - SupernodeMetadataDecoded EventType = "supernode:metadata_decoded" - SupernodeDataHashVerified EventType = "supernode:data_hash_verified" - SupernodeInputEncoded EventType = "supernode:input_encoded" - SupernodeSignatureVerified EventType = "supernode:signature_verified" - SupernodeRQIDGenerated EventType = "supernode:rqid_generated" - SupernodeRQIDVerified EventType = "supernode:rqid_verified" - SupernodeArtefactsStored EventType = "supernode:artefacts_stored" - SupernodeActionFinalized EventType = "supernode:action_finalized" - SupernodeUnknown EventType = "supernode:unknown" + SupernodeActionRetrieved EventType = "supernode:action_retrieved" + SupernodeActionFeeVerified EventType = "supernode:action_fee_verified" + SupernodeTopCheckPassed EventType = "supernode:top_check_passed" + SupernodeMetadataDecoded EventType = "supernode:metadata_decoded" + SupernodeDataHashVerified EventType = "supernode:data_hash_verified" + SupernodeInputEncoded EventType = "supernode:input_encoded" + SupernodeSignatureVerified EventType = "supernode:signature_verified" + SupernodeRQIDGenerated EventType = "supernode:rqid_generated" + SupernodeRQIDVerified EventType = "supernode:rqid_verified" + SupernodeArtefactsStored EventType = "supernode:artefacts_stored" + SupernodeActionFinalized EventType = "supernode:action_finalized" + SupernodeArtefactsDownloaded EventType = "supernode:artefacts_downloaded" + SupernodeUnknown EventType = "supernode:unknown" ) // EventData is a map of event data attributes using standardized keys diff --git a/sdk/net/client.go b/sdk/net/client.go index 16564d4d..ee058d84 100644 --- a/sdk/net/client.go +++ b/sdk/net/client.go @@ -15,6 +15,9 @@ type SupernodeClient interface { // HealthCheck performs a health check on the supernode HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error) + // Download downloads the cascade action file + Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error) + // Close releases resources used by the client Close(ctx context.Context) error } diff --git a/sdk/net/impl.go b/sdk/net/impl.go index 27713263..bccfc367 100644 --- a/sdk/net/impl.go +++ b/sdk/net/impl.go @@ -129,6 +129,16 @@ func (c *supernodeClient) HealthCheck(ctx context.Context) (*grpc_health_v1.Heal return resp, nil } +// Download downloads the cascade action file +func (c *supernodeClient) Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error) { + resp, err := c.cascadeClient.CascadeSupernodeDownload(ctx, in, opts...) + if err != nil { + return nil, fmt.Errorf("get artefacts failed: %w", err) + } + + return resp, nil +} + // Close closes the connection to the supernode func (c *supernodeClient) Close(ctx context.Context) error { if c.conn != nil { diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index 279a3ea6..ba15fdbe 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -2,18 +2,13 @@ package task import ( "context" - "errors" "fmt" - "sync" "time" - "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/net" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc/health/grpc_health_v1" ) const ( @@ -63,66 +58,6 @@ func (t *CascadeTask) Run(ctx context.Context) error { return nil } -func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { - sns, err := t.client.GetSupernodes(ctx, height) - if err != nil { - return nil, fmt.Errorf("fetch supernodes: %w", err) - } - - if len(sns) == 0 { - return nil, errors.New("no supernodes found") - } - - if len(sns) > 10 { - sns = sns[:10] - } - - // Keep only SERVING nodes (done in parallel – keeps latency flat) - healthy := make(lumera.Supernodes, 0, len(sns)) - eg, ctx := errgroup.WithContext(ctx) - mu := sync.Mutex{} - - for _, sn := range sns { - sn := sn - eg.Go(func() error { - if t.isServing(ctx, sn) { - mu.Lock() - healthy = append(healthy, sn) - mu.Unlock() - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return nil, fmt.Errorf("health-check goroutines: %w", err) - } - - if len(healthy) == 0 { - return nil, errors.New("no healthy supernodes found") - } - - return healthy, nil -} - -// isServing pings the super-node once with a short timeout. -func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool { - ctx, cancel := context.WithTimeout(parent, connectionTimeout) - defer cancel() - - client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{ - LocalCosmosAddress: t.config.Account.LocalCosmosAddress, - PeerType: t.config.Account.PeerType, - }).CreateClient(ctx, sn) - if err != nil { - logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"}) - return false - } - defer client.Close(ctx) - - resp, err := client.HealthCheck(ctx) - return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING -} - func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { factoryCfg := net.FactoryConfig{ LocalCosmosAddress: t.config.Account.LocalCosmosAddress, diff --git a/sdk/task/download.go b/sdk/task/download.go new file mode 100644 index 00000000..36b99c44 --- /dev/null +++ b/sdk/task/download.go @@ -0,0 +1,131 @@ +package task + +import ( + "context" + "fmt" + "time" + + "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" + "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" + "github.com/LumeraProtocol/supernode/sdk/event" + "github.com/LumeraProtocol/supernode/sdk/net" +) + +// timeouts +const ( + downloadTimeout = 5 * time.Minute +) + +type CascadeDownloadTask struct { + BaseTask + actionId string + outputPath string +} + +func NewCascadeDownloadTask(base BaseTask, actionId string, outputPath string) *CascadeDownloadTask { + return &CascadeDownloadTask{ + BaseTask: base, + actionId: actionId, + outputPath: outputPath, + } +} + +func (t *CascadeDownloadTask) Run(ctx context.Context) error { + t.LogEvent(ctx, event.SDKTaskStarted, "Running cascade download task", nil) + + // 1 – fetch super-nodes + supernodes, err := t.fetchSupernodes(ctx, t.Action.Height) + if err != nil { + t.LogEvent(ctx, event.SDKSupernodesUnavailable, "super-nodes unavailable", event.EventData{event.KeyError: err.Error()}) + t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) + return err + } + t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes found", event.EventData{event.KeyCount: len(supernodes)}) + + // 2 – download from super-nodes + if err := t.downloadFromSupernodes(ctx, supernodes); err != nil { + t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) + return err + } + + t.LogEvent(ctx, event.SDKTaskCompleted, "cascade download completed successfully", nil) + return nil +} + +func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { + factoryCfg := net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, + PeerType: t.config.Account.PeerType, + } + clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, factoryCfg) + + req := &supernodeservice.CascadeSupernodeDownloadRequest{ + ActionID: t.actionId, + TaskID: t.TaskID, + OutputPath: t.outputPath, + } + + var lastErr error + for idx, sn := range supernodes { + t.LogEvent(ctx, event.SDKDownloadAttempt, "attempting download from super-node", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + }) + + if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil { + lastErr = err + t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + event.KeyError: err.Error(), + }) + continue + } + + t.LogEvent(ctx, event.SDKDownloadSuccessful, "download successful", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + }) + return nil + } + return fmt.Errorf("failed to download from all super-nodes: %w", lastErr) +} + +func (t *CascadeDownloadTask) attemptDownload( + parent context.Context, + sn lumera.Supernode, + factory *net.ClientFactory, + req *supernodeservice.CascadeSupernodeDownloadRequest, +) error { + + ctx, cancel := context.WithTimeout(parent, downloadTimeout) + defer cancel() + + client, err := factory.CreateClient(ctx, sn) + if err != nil { + return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) + } + defer client.Close(ctx) + + req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { + t.LogEvent(ctx, evt, msg, data) + } + + resp, err := client.Download(ctx, req) + if err != nil { + return fmt.Errorf("download from %s: %w", sn.CosmosAddress, err) + } + if !resp.Success { + return fmt.Errorf("download rejected by %s: %s", sn.CosmosAddress, resp.Message) + } + + t.LogEvent(ctx, event.SDKOutputPathReceived, "file downloaded", event.EventData{ + event.KeyOutputPath: resp.OutputPath, + event.KeySupernode: sn.CosmosAddress, + }) + + return nil +} diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go index 7e2abbd2..1646a326 100644 --- a/sdk/task/helpers.go +++ b/sdk/task/helpers.go @@ -80,3 +80,22 @@ func (m *ManagerImpl) validateSignature(ctx context.Context, action lumera.Actio return nil } + +func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID string) (lumera.Action, error) { + action, err := m.lumeraClient.GetAction(ctx, actionID) + if err != nil { + return lumera.Action{}, fmt.Errorf("failed to get action: %w", err) + } + + // Check if action exists + if action.ID == "" { + return lumera.Action{}, fmt.Errorf("no action found with the specified ID") + } + + // Check action state + if action.State != lumera.ACTION_STATE_DONE { + return lumera.Action{}, fmt.Errorf("action is in %s state, expected DONE", action.State) + } + + return action, nil +} diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 327a3eef..1f1f0a19 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -24,6 +24,8 @@ type Manager interface { DeleteTask(ctx context.Context, taskID string) error SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) SubscribeToAllEvents(ctx context.Context, handler event.Handler) + + CreateDownloadTask(ctx context.Context, actionID, outputPath string) (string, error) } type ManagerImpl struct { @@ -226,3 +228,53 @@ func (m *ManagerImpl) Close(ctx context.Context) { m.taskCache.Close(ctx) } } + +func (m *ManagerImpl) CreateDownloadTask( + ctx context.Context, + actionID string, + outputPath string, +) (string, error) { + + // First validate the action before creating the task + action, err := m.validateDownloadAction(ctx, actionID) + if err != nil { + return "", err + } + + taskID := uuid.New().String()[:8] + + m.logger.Debug(ctx, "Generated download task ID", "task_id", taskID) + + baseTask := BaseTask{ + TaskID: taskID, + ActionID: actionID, + TaskType: TaskTypeCascade, + Action: action, + client: m.lumeraClient, + keyring: m.keyring, + config: m.config, + onEvent: m.handleEvent, + logger: m.logger, + } + + task := NewCascadeDownloadTask(baseTask, actionID, outputPath) + + // Store task in cache + m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID) + + // Ensure task is stored before returning + m.taskCache.Wait() + + go func() { + m.logger.Debug(ctx, "Starting download cascade task asynchronously", "taskID", taskID) + err := task.Run(ctx) + if err != nil { + // Error handling is done via events in the task.Run method + // This is just a failsafe in case something goes wrong + m.logger.Error(ctx, "Download Cascade task failed with error", "taskID", taskID, "error", err) + } + }() + + m.logger.Info(ctx, "Download Cascade task created successfully", "taskID", taskID) + return taskID, nil +} diff --git a/sdk/task/task.go b/sdk/task/task.go index 1b738d68..9cb97939 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -2,11 +2,18 @@ package task import ( "context" + "errors" + "fmt" + "sync" + "github.com/LumeraProtocol/supernode/pkg/errgroup" + "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/config" "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/log" + "github.com/LumeraProtocol/supernode/sdk/net" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/cosmos/cosmos-sdk/crypto/keyring" ) @@ -67,3 +74,63 @@ func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string t.logger.Info(ctx, msg, kvs...) t.emitEvent(ctx, evt, additionalInfo) } + +func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) { + sns, err := t.client.GetSupernodes(ctx, height) + if err != nil { + return nil, fmt.Errorf("fetch supernodes: %w", err) + } + + if len(sns) == 0 { + return nil, errors.New("no supernodes found") + } + + if len(sns) > 10 { + sns = sns[:10] + } + + // Keep only SERVING nodes (done in parallel – keeps latency flat) + healthy := make(lumera.Supernodes, 0, len(sns)) + eg, ctx := errgroup.WithContext(ctx) + mu := sync.Mutex{} + + for _, sn := range sns { + sn := sn + eg.Go(func() error { + if t.isServing(ctx, sn) { + mu.Lock() + healthy = append(healthy, sn) + mu.Unlock() + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("health-check goroutines: %w", err) + } + + if len(healthy) == 0 { + return nil, errors.New("no healthy supernodes found") + } + + return healthy, nil +} + +// isServing pings the super-node once with a short timeout. +func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() + + client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, + PeerType: t.config.Account.PeerType, + }).CreateClient(ctx, sn) + if err != nil { + logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"}) + return false + } + defer client.Close(ctx) + + resp, err := client.HealthCheck(ctx) + return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING +} diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index 7feedffe..238b53d6 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -197,6 +197,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS task := server.factory.NewCascadeRegistrationTask() var restoredFile []byte + var tmpDir string err := task.Download(ctx, &cascadeService.DownloadRequest{ ActionID: req.GetActionId(), @@ -212,6 +213,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS if len(resp.Artefacts) > 0 { restoredFile = resp.Artefacts + tmpDir = resp.DownloadedDir } return stream.Send(grpcResp) @@ -254,6 +256,15 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS } } + err = task.DownloadCleanup(ctx, tmpDir) + if err != nil { + logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{ + logtrace.FieldError: err.Error(), + }) + } + fields["tmp_dir"] = tmpDir + logtrace.Info(ctx, "tmp dir has been cleaned up", fields) + logtrace.Info(ctx, "completed streaming all chunks", fields) return nil } diff --git a/supernode/services/cascade/adaptors/rq.go b/supernode/services/cascade/adaptors/rq.go index b3ce70fc..5f4670fc 100644 --- a/supernode/services/cascade/adaptors/rq.go +++ b/supernode/services/cascade/adaptors/rq.go @@ -54,8 +54,8 @@ type DecodeRequest struct { } type DecodeResponse struct { - LayoutPath string - FilePath string + DecodeTmpDir string + FilePath string } // Decode decodes the provided symbols and returns the original file @@ -70,7 +70,7 @@ func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeRespon } return DecodeResponse{ - LayoutPath: resp.LayoutPath, - FilePath: resp.Path, + FilePath: resp.Path, + DecodeTmpDir: resp.DecodeTmpDir, }, nil } diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index bc234b9c..94ce976c 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -23,9 +23,10 @@ type DownloadRequest struct { } type DownloadResponse struct { - EventType SupernodeEventType - Message string - Artefacts []byte + EventType SupernodeEventType + Message string + Artefacts []byte + DownloadedDir string } func (task *CascadeRegistrationTask) Download( @@ -42,7 +43,7 @@ func (task *CascadeRegistrationTask) Download( return task.wrapErr(ctx, "failed to get action", err, fields) } logtrace.Info(ctx, "action has been retrieved", fields) - task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", nil, send) + task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", nil, "", send) if actionDetails.GetAction().State != actiontypes.ActionStateDone { err = errors.New("action is not in a valid state") @@ -51,7 +52,7 @@ func (task *CascadeRegistrationTask) Download( return task.wrapErr(ctx, "action not found", err, fields) } logtrace.Info(ctx, "action has been validated", fields) - task.streamDownloadEvent(SupernodeEventTypeActionFinalized, "action state has been validated", nil, send) + task.streamDownloadEvent(SupernodeEventTypeActionFinalized, "action state has been validated", nil, "", send) metadata, err := task.decodeCascadeMetadata(ctx, actionDetails.GetAction().Metadata, fields) if err != nil { @@ -59,20 +60,22 @@ func (task *CascadeRegistrationTask) Download( return task.wrapErr(ctx, "error decoding cascade metadata", err, fields) } logtrace.Info(ctx, "cascade metadata has been decoded", fields) - task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "metadata has been decoded", nil, send) + task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "metadata has been decoded", nil, "", send) - file, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields) + file, tmpDir, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields) if err != nil { fields[logtrace.FieldError] = err.Error() return task.wrapErr(ctx, "failed to download artifacts", err, fields) } logtrace.Info(ctx, "artifacts have been downloaded", fields) - task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "artifacts have been downloaded", file, send) + task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "artifacts have been downloaded", file, tmpDir, send) return nil } -func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) ([]byte, error) { +func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) ([]byte, string, error) { + logtrace.Info(ctx, "started downloading the artifacts", fields) + var layout codec.Layout for _, rqID := range metadata.RqIdsIds { rqIDFile, err := task.P2PClient.Retrieve(ctx, rqID) @@ -87,18 +90,19 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti continue } - if len(layout.Blocks) < int(float64(len(metadata.RqIdsIds))*requiredSymbolPercent/100) { - logtrace.Info(ctx, "not enough symbols found in RQ metadata", fields) - continue - } + //if len(layout.Blocks) < int(float64(len(metadata.RqIdsIds))*requiredSymbolPercent/100) { + // logtrace.Info(ctx, "not enough symbols found in RQ metadata", fields) + // continue + //} if err == nil { + logtrace.Info(ctx, "layout file retrieved", fields) break } } if len(layout.Blocks) == 0 { - return nil, errors.New("no symbols found in RQ metadata") + return nil, "", errors.New("no symbols found in RQ metadata") } return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID) @@ -109,7 +113,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( layout codec.Layout, dataHash string, actionID string, -) ([]byte, error) { +) ([]byte, string, error) { fields := logtrace.Fields{ logtrace.FieldActionID: actionID, @@ -131,7 +135,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "failed to retrieve symbols", fields) - return nil, fmt.Errorf("failed to retrieve symbols: %w", err) + return nil, "", fmt.Errorf("failed to retrieve symbols: %w", err) } fields["retrievedSymbols"] = len(symbols) @@ -146,14 +150,14 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "failed to decode symbols", fields) - return nil, fmt.Errorf("decode symbols using RaptorQ: %w", err) + return nil, "", fmt.Errorf("decode symbols using RaptorQ: %w", err) } file, err := os.ReadFile(decodeInfo.FilePath) if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "failed to read file", fields) - return nil, fmt.Errorf("read decoded file: %w", err) + return nil, "", fmt.Errorf("read decoded file: %w", err) } // 3. Validate hash (Blake3) @@ -161,26 +165,39 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "failed to do hash", fields) - return nil, fmt.Errorf("hash file: %w", err) + return nil, "", fmt.Errorf("hash file: %w", err) } err = task.verifyDataHash(ctx, fileHash, dataHash, fields) if err != nil { logtrace.Error(ctx, "failed to verify hash", fields) fields[logtrace.FieldError] = err.Error() - return nil, err + return nil, decodeInfo.DecodeTmpDir, err } logtrace.Info(ctx, "file successfully restored and hash verified", fields) - return file, nil + return file, decodeInfo.DecodeTmpDir, nil } -func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg string, file []byte, send func(resp *DownloadResponse) error) { +func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg string, file []byte, tmpDir string, send func(resp *DownloadResponse) error) { _ = send(&DownloadResponse{ - EventType: eventType, - Message: msg, - Artefacts: file, + EventType: eventType, + Message: msg, + Artefacts: file, + DownloadedDir: tmpDir, }) return } + +func (task *CascadeRegistrationTask) DownloadCleanup(ctx context.Context, symbolsDir string) error { + if symbolsDir == "" { + return errors.New("symbolsDir path is empty") + } + + if err := os.RemoveAll(symbolsDir); err != nil { + return errors.Errorf("failed to delete symbols directory: %s, :%s", symbolsDir, err.Error()) + } + + return nil +} diff --git a/supernode/services/cascade/interfaces.go b/supernode/services/cascade/interfaces.go index 1c27c2f6..82c7002b 100644 --- a/supernode/services/cascade/interfaces.go +++ b/supernode/services/cascade/interfaces.go @@ -16,4 +16,5 @@ type RegistrationTaskService interface { Register(ctx context.Context, req *RegisterRequest, send func(resp *RegisterResponse) error) error HealthCheck(ctx context.Context) (HealthCheckResponse, error) Download(ctx context.Context, req *DownloadRequest, send func(resp *DownloadResponse) error) error + DownloadCleanup(ctx context.Context, actionID string) error } diff --git a/supernode/services/cascade/mocks/cascade_interfaces_mock.go b/supernode/services/cascade/mocks/cascade_interfaces_mock.go index 01bbd320..4cd51c65 100644 --- a/supernode/services/cascade/mocks/cascade_interfaces_mock.go +++ b/supernode/services/cascade/mocks/cascade_interfaces_mock.go @@ -86,6 +86,20 @@ func (mr *MockRegistrationTaskServiceMockRecorder) Download(ctx, req, send inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockRegistrationTaskService)(nil).Download), ctx, req, send) } +// DownloadCleanup mocks base method. +func (m *MockRegistrationTaskService) DownloadCleanup(ctx context.Context, actionID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DownloadCleanup", ctx, actionID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DownloadCleanup indicates an expected call of DownloadCleanup. +func (mr *MockRegistrationTaskServiceMockRecorder) DownloadCleanup(ctx, actionID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadCleanup", reflect.TypeOf((*MockRegistrationTaskService)(nil).DownloadCleanup), ctx, actionID) +} + // HealthCheck mocks base method. func (m *MockRegistrationTaskService) HealthCheck(ctx context.Context) (cascade.HealthCheckResponse, error) { m.ctrl.T.Helper() diff --git a/tests/system/cli.go b/tests/system/cli.go index 9e42dd8c..633fe933 100644 --- a/tests/system/cli.go +++ b/tests/system/cli.go @@ -174,7 +174,7 @@ func (c LumeradCli) awaitTxCommitted(submitResp string, timeout ...time.Duration txHash := gjson.Get(submitResp, "txhash") require.True(c.t, txHash.Exists()) var txResult string - for i := 0; i < 3; i++ { // max blocks to wait for a commit + for i := 0; i < 10; i++ { // max blocks to wait for a commit txResult = c.WithRunErrorsIgnored().CustomQuery("q", "tx", txHash.String()) if code := gjson.Get(txResult, "code"); code.Exists() { if code.Int() != 0 { // 0 = success code diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 6538f444..6402c6ad 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -362,8 +362,8 @@ func TestCascadeE2E(t *testing.T) { sut.AwaitNextBlock(t) // Verify the account can be queried with its public key - accountResp := cli.CustomQuery("q", "auth", "account", userAddress) - require.Contains(t, accountResp, "public_key", "User account public key should be available") + //accountResp := cli.CustomQuery("q", "auth", "account", userAddress) + //require.Contains(t, accountResp, "public_key", "User account public key should be available") // Extract transaction hash from response for verification txHash := txresp.TxHash @@ -381,6 +381,8 @@ func TestCascadeE2E(t *testing.T) { // --------------------------------------- // Step 8: Extract action ID and start cascade // --------------------------------------- + time.Sleep(30 * time.Second) + t.Log("Step 8: Extracting action ID and creating cascade request") // Extract action ID from transaction events @@ -557,6 +559,18 @@ func TestCascadeE2E(t *testing.T) { require.Equal(t, price, amount, "Payment amount should match action price") t.Log("Test completed successfully!") + + time.Sleep(1 * time.Minute) + + outputFileName := "output.txt" + outputFileFullpath := filepath.Join(t.TempDir(), outputFileName) + // Try to download the file using the action ID + dtaskID, err := actionClient.DownloadCascade(ctx, actionID, outputFileFullpath) + + t.Logf("Download response: %s", dtaskID) + require.NoError(t, err, "Failed to download cascade data using action ID") + + time.Sleep(30 * time.Second) // Wait to ensure all events are processed } func Blake3Hash(msg []byte) ([]byte, error) { hasher := blake3.New(32, nil) diff --git a/tests/system/system.go b/tests/system/system.go index 2904a984..8666c474 100644 --- a/tests/system/system.go +++ b/tests/system/system.go @@ -407,7 +407,7 @@ func (s *SystemUnderTest) AwaitBlockHeight(t *testing.T, targetHeight int64, tim // Returns the new height func (s *SystemUnderTest) AwaitNextBlock(t *testing.T, timeout ...time.Duration) int64 { t.Helper() - maxWaitTime := s.blockTime * 3 + maxWaitTime := s.blockTime * 10 if len(timeout) != 0 { // optional argument to overwrite default timeout maxWaitTime = timeout[0] } @@ -760,7 +760,7 @@ func NewEventListener(t *testing.T, rpcAddr string) *EventListener { return &EventListener{client: httpClient, t: t} } -var DefaultWaitTime = 30 * time.Second +var DefaultWaitTime = 60 * time.Second type ( CleanupFn func()