Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 61 additions & 51 deletions gen/supernode/action/cascade/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/supernode/action/cascade/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message RegisterResponse {

message DownloadRequest {
string action_id = 1;
string signature = 2;
}

message DownloadResponse {
Expand Down
6 changes: 3 additions & 3 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Client interface {
SubscribeToAllEvents(ctx context.Context, handler event.Handler) error
GetSupernodeStatus(ctx context.Context, supernodeAddress string) (*supernodeservice.SupernodeStatusresponse, error)
// DownloadCascade downloads cascade to outputDir, filename determined by action ID
DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error)
DownloadCascade(ctx context.Context, actionID, outputDir, signature string) (string, error)
}

// ClientImpl implements the Client interface
Expand Down Expand Up @@ -205,13 +205,13 @@ func (c *ClientImpl) GetSupernodeStatus(ctx context.Context, supernodeAddress st
return status, nil
}

func (c *ClientImpl) DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error) {
func (c *ClientImpl) DownloadCascade(ctx context.Context, actionID, outputDir, signature string) (string, error) {

if actionID == "" {
return "", fmt.Errorf("actionID is empty")
}

taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputDir)
taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputDir, signature)
if err != nil {
return "", fmt.Errorf("create download task: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(

// 1. Open gRPC stream (server-stream)
stream, err := a.client.Download(ctx, &cascade.DownloadRequest{
ActionId: in.ActionID,
ActionId: in.ActionID,
Signature: in.Signature,
}, opts...)
if err != nil {
a.logger.Error(ctx, "failed to create download stream", "action_id", in.ActionID, "error", err)
Expand Down
1 change: 1 addition & 0 deletions sdk/adapters/supernodeservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type CascadeSupernodeDownloadRequest struct {
ActionID string
TaskID string
OutputPath string
Signature string
EventLogger LoggerFunc
}

Expand Down
6 changes: 5 additions & 1 deletion sdk/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ type CascadeDownloadTask struct {
BaseTask
actionId string
outputPath string
signature string
}

func NewCascadeDownloadTask(base BaseTask, actionId string, outputPath string) *CascadeDownloadTask {
func NewCascadeDownloadTask(base BaseTask, actionId string, outputPath string, signature string) *CascadeDownloadTask {
return &CascadeDownloadTask{
BaseTask: base,
actionId: actionId,
outputPath: outputPath,
signature: signature,
}
}

Expand Down Expand Up @@ -64,6 +66,7 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
ActionID: t.actionId,
TaskID: t.TaskID,
OutputPath: t.outputPath,
Signature: t.signature,
}

// Process supernodes in pairs
Expand Down Expand Up @@ -192,6 +195,7 @@ func (t *CascadeDownloadTask) attemptConcurrentDownload(
ActionID: req.ActionID,
TaskID: req.TaskID,
OutputPath: req.OutputPath,
Signature: req.Signature,
}

err := t.attemptDownload(batchCtx, sn, factory, reqCopy)
Expand Down
7 changes: 3 additions & 4 deletions sdk/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Manager interface {
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler)
SubscribeToAllEvents(ctx context.Context, handler event.Handler)

CreateDownloadTask(ctx context.Context, actionID, outputPath string) (string, error)
CreateDownloadTask(ctx context.Context, actionID, outputPath, signature string) (string, error)
}

type ManagerImpl struct {
Expand Down Expand Up @@ -241,13 +241,12 @@ func (m *ManagerImpl) Close(ctx context.Context) {
}
}

func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, outputDir string) (string, error) {
func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, outputDir string, signature string) (string, error) {
// First validate the action before creating the task
action, err := m.validateDownloadAction(ctx, actionID)
if err != nil {
return "", err
}

// Decode metadata to get the filename
metadata, err := m.lumeraClient.DecodeCascadeMetadata(ctx, action)
if err != nil {
Expand Down Expand Up @@ -279,7 +278,7 @@ func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, o
}

// Use the final output path with the correct filename
task := NewCascadeDownloadTask(baseTask, actionID, finalOutputPath)
task := NewCascadeDownloadTask(baseTask, actionID, finalOutputPath, signature)

// Store task in cache
m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID)
Expand Down
34 changes: 25 additions & 9 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func NewCascadeActionServer(factory cascadeService.CascadeServiceFactory) *Actio
// to balance throughput and memory usage
func calculateOptimalChunkSize(fileSize int64) int {
const (
minChunkSize = 64 * 1024 // 64 KB minimum
maxChunkSize = 4 * 1024 * 1024 // 4 MB maximum for 1GB+ files
smallFileThreshold = 1024 * 1024 // 1 MB
minChunkSize = 64 * 1024 // 64 KB minimum
maxChunkSize = 4 * 1024 * 1024 // 4 MB maximum for 1GB+ files
smallFileThreshold = 1024 * 1024 // 1 MB
mediumFileThreshold = 50 * 1024 * 1024 // 50 MB
largeFileThreshold = 500 * 1024 * 1024 // 500 MB
largeFileThreshold = 500 * 1024 * 1024 // 500 MB
)

var chunkSize int

switch {
case fileSize <= smallFileThreshold:
// For small files (up to 1MB), use 64KB chunks
Expand All @@ -51,15 +51,15 @@ func calculateOptimalChunkSize(fileSize int64) int {
// For very large files (500MB+), use 4MB chunks for optimal throughput
chunkSize = maxChunkSize
}

// Ensure chunk size is within bounds
if chunkSize < minChunkSize {
chunkSize = minChunkSize
}
if chunkSize > maxChunkSize {
chunkSize = maxChunkSize
}

return chunkSize
}

Expand Down Expand Up @@ -219,6 +219,22 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS

task := server.factory.NewCascadeRegistrationTask()

// Verify signature if provided
if req.GetSignature() != "" {
// Cast to concrete type to access helper method
if cascadeTask, ok := task.(*cascadeService.CascadeRegistrationTask); ok {
err := cascadeTask.VerifyDownloadSignature(ctx, req.GetActionId(), req.GetSignature())
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "signature verification failed", fields)
return fmt.Errorf("signature verification failed: %w", err)
}
} else {
logtrace.Error(ctx, "unable to cast task to CascadeRegistrationTask", fields)
return fmt.Errorf("unable to verify signature: task type assertion failed")
}
}

var restoredFile []byte
var tmpDir string

Expand Down Expand Up @@ -257,9 +273,9 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS

// Calculate optimal chunk size based on file size
chunkSize := calculateOptimalChunkSize(int64(len(restoredFile)))

logtrace.Info(ctx, "calculated optimal chunk size for download", logtrace.Fields{
"file_size": len(restoredFile),
"file_size": len(restoredFile),
"chunk_size": chunkSize,
})

Expand Down
35 changes: 35 additions & 0 deletions supernode/services/cascade/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,38 @@ func decodeIndexFile(data string) (IndexFile, error) {
}
return indexFile, nil
}

// VerifyDownloadSignature verifies the download signature for actionID.creatorAddress
func (task *CascadeRegistrationTask) VerifyDownloadSignature(ctx context.Context, actionID, signature string) error {
fields := logtrace.Fields{
logtrace.FieldActionID: actionID,
logtrace.FieldMethod: "VerifyDownloadSignature",
}

// Get action details to extract creator address
actionDetails, err := task.LumeraClient.GetAction(ctx, actionID)
if err != nil {
return task.wrapErr(ctx, "failed to get action", err, fields)
}

creatorAddress := actionDetails.GetAction().Creator
fields["creator_address"] = creatorAddress

// Create the expected signature data: actionID.creatorAddress
signatureData := fmt.Sprintf("%s.%s", actionID, creatorAddress)
fields["signature_data"] = signatureData

// Decode the base64 signature
signatureBytes, err := base64.StdEncoding.DecodeString(signature)
if err != nil {
return task.wrapErr(ctx, "failed to decode signature from base64", err, fields)
}

// Verify the signature using Lumera client
if err := task.LumeraClient.Verify(ctx, creatorAddress, []byte(signatureData), signatureBytes); err != nil {
return task.wrapErr(ctx, "failed to verify download signature", err, fields)
}

logtrace.Info(ctx, "download signature successfully verified", fields)
return nil
}
Loading