Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 304 additions & 89 deletions gen/supernode/action/cascade/service.pb.go

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions gen/supernode/action/cascade/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ type EncodeRequest struct {
type Codec interface {
// Encode a file
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
}
23 changes: 16 additions & 7 deletions pkg/codec/codec_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions pkg/codec/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package codec

import (
"context"
"fmt"
"os"
"path/filepath"

raptorq "github.com/LumeraProtocol/rq-go"
"github.com/LumeraProtocol/supernode/pkg/logtrace"
)

type DecodeRequest struct {
ActionID string
Layout Layout
Symbols map[string][]byte
}

type DecodeResponse struct {
Path string
LayoutPath string
}

func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) {
fields := logtrace.Fields{
logtrace.FieldMethod: "Decode",
logtrace.FieldModule: "rq",
logtrace.FieldActionID: req.ActionID,
}
logtrace.Info(ctx, "RaptorQ decode request received", fields)

processor, err := raptorq.NewDefaultRaptorQProcessor()
if err != nil {
fields[logtrace.FieldError] = err.Error()
return DecodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err)
}
defer processor.Free()

symbolsDir := filepath.Join(rq.symbolsBaseDir, req.ActionID)
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
fields[logtrace.FieldError] = err.Error()
return DecodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err)
}

// Write symbols to disk
for id, data := range req.Symbols {
symbolPath := filepath.Join(symbolsDir, id)
if err := os.WriteFile(symbolPath, data, 0o644); err != nil {
fields[logtrace.FieldError] = err.Error()
return DecodeResponse{}, fmt.Errorf("write symbol %s: %w", id, err)
}
}
logtrace.Info(ctx, "symbols written to disk", fields)

// Decode
outputPath := filepath.Join(symbolsDir, "output")
if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil {
fields[logtrace.FieldError] = err.Error()
_ = os.Remove(outputPath)
return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err)
}

logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields)
return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil
}
1 change: 1 addition & 0 deletions pkg/logtrace/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ const (
FieldTaskID = "task_id"
FieldActionID = "action_id"
FieldHashHex = "hash_hex"
FieldActionState = "action_state"
)
15 changes: 15 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,21 @@ func ZstdCompress(data []byte) ([]byte, error) {
return encoder.EncodeAll(data, nil), nil
}

func ZstdDecompress(data []byte) ([]byte, error) {
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, fmt.Errorf("failed to create zstd decoder: %v", err)
}
defer decoder.Close()

decoded, err := decoder.DecodeAll(data, nil)
if err != nil {
return nil, fmt.Errorf("failed to decompress zstd data: %v", err)
}

return decoded, nil
}

// HighCompress compresses the data
func HighCompress(cctx context.Context, data []byte) ([]byte, error) {
ctx, cancel := context.WithTimeout(cctx, highCompressTimeout)
Expand Down
19 changes: 18 additions & 1 deletion proto/supernode/action/cascade/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/ca
service CascadeService {
rpc Register (stream RegisterRequest) returns (stream RegisterResponse);
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);

rpc Download (DownloadRequest) returns (stream DownloadResponse);
}

message RegisterRequest {
Expand All @@ -30,6 +30,22 @@ message RegisterResponse {
string tx_hash = 3;
}

message DownloadRequest {
string action_id = 1;
}

message DownloadResponse {
oneof response_type {
DownloadEvent event = 1;
DataChunk chunk = 2;
}
}

message DownloadEvent {
SupernodeEventType event_type = 1;
string message = 2;
}

enum SupernodeEventType {
UNKNOWN = 0;
ACTION_RETRIEVED = 1;
Expand All @@ -43,6 +59,7 @@ enum SupernodeEventType {
RQID_VERIFIED = 9;
ARTEFACTS_STORED = 10;
ACTION_FINALIZED = 11;
Artefacts_Downloaded = 12;
}

message HealthCheckRequest {}
Expand Down
74 changes: 74 additions & 0 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,77 @@ func (server *ActionServer) HealthCheck(ctx context.Context, _ *pb.HealthCheckRe
TasksInProgress: resp.TasksInProgress,
}, nil
}

func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeService_DownloadServer) error {
fields := logtrace.Fields{
logtrace.FieldMethod: "Download",
logtrace.FieldModule: "CascadeActionServer",
logtrace.FieldActionID: req.GetActionId(),
}

ctx := stream.Context()
logtrace.Info(ctx, "download request received from client", fields)

task := server.factory.NewCascadeRegistrationTask()

var restoredFile []byte

err := task.Download(ctx, &cascadeService.DownloadRequest{
ActionID: req.GetActionId(),
}, func(resp *cascadeService.DownloadResponse) error {
grpcResp := &pb.DownloadResponse{
ResponseType: &pb.DownloadResponse_Event{
Event: &pb.DownloadEvent{
EventType: pb.SupernodeEventType(resp.EventType),
Message: resp.Message,
},
},
}

if len(resp.Artefacts) > 0 {
restoredFile = resp.Artefacts
}

return stream.Send(grpcResp)
})

if err != nil {
logtrace.Error(ctx, "error occurred during download process", logtrace.Fields{
logtrace.FieldError: err.Error(),
})
return err
}

if len(restoredFile) == 0 {
logtrace.Error(ctx, "no artefact file retrieved", fields)
return fmt.Errorf("no artefact to stream")
}
logtrace.Info(ctx, "streaming artefact file in chunks", fields)

// Split and stream the file in 1024 byte chunks
const chunkSize = 1024
for i := 0; i < len(restoredFile); i += chunkSize {
end := i + chunkSize
if end > len(restoredFile) {
end = len(restoredFile)
}

err := stream.Send(&pb.DownloadResponse{
ResponseType: &pb.DownloadResponse_Chunk{
Chunk: &pb.DataChunk{
Data: restoredFile[i:end],
},
},
})

if err != nil {
logtrace.Error(ctx, "failed to stream chunk", logtrace.Fields{
logtrace.FieldError: err.Error(),
})
return err
}
}

logtrace.Info(ctx, "completed streaming all chunks", fields)
return nil
}
Loading