Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
//
//go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go
type Client interface {
StartCascade(ctx context.Context, data []byte, actionID string) (string, error)
StartCascade(ctx context.Context, filePath string, actionID string) (string, error)
DeleteTask(ctx context.Context, taskID string) error
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error
Expand Down Expand Up @@ -54,17 +54,17 @@ func NewClient(ctx context.Context, config config.Config, logger log.Logger, key
}

// StartCascade initiates a cascade operation
func (c *ClientImpl) StartCascade(ctx context.Context, data []byte, actionID string) (string, error) {
func (c *ClientImpl) StartCascade(ctx context.Context, filePath string, actionID string) (string, error) {
if actionID == "" {
c.logger.Error(ctx, "Empty action ID provided")
return "", ErrEmptyActionID
}
if len(data) == 0 {
c.logger.Error(ctx, "Empty data provided")
if filePath == "" {
c.logger.Error(ctx, "Empty file path provided")
return "", ErrEmptyData
Copy link

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned for an empty file path still uses ErrEmptyData, which may cause confusion; consider defining and using a distinct ErrEmptyFilePath constant.

Suggested change
return "", ErrEmptyData
return "", ErrEmptyFilePath

Copilot uses AI. Check for mistakes.
}

taskID, err := c.taskManager.CreateCascadeTask(ctx, data, actionID)
taskID, err := c.taskManager.CreateCascadeTask(ctx, filePath, actionID)
if err != nil {
c.logger.Error(ctx, "Failed to create cascade task", "error", err)
return "", fmt.Errorf("failed to create cascade task: %w", err)
Expand Down
194 changes: 0 additions & 194 deletions sdk/action/client_test.go

This file was deleted.

52 changes: 32 additions & 20 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"os"

"github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
"github.com/LumeraProtocol/supernode/pkg/net"
Expand Down Expand Up @@ -42,30 +43,47 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
return nil, err
}

// Open the file for reading
file, err := os.Open(in.FilePath)
if err != nil {
a.logger.Error(ctx, "Failed to open file", "filePath", in.FilePath, "error", err)
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

// Get file stats for progress tracking
fileInfo, err := file.Stat()
if err != nil {
a.logger.Error(ctx, "Failed to get file stats", "filePath", in.FilePath, "error", err)
return nil, fmt.Errorf("failed to get file stats: %w", err)
}
totalBytes := fileInfo.Size()

// Define chunk size
const chunkSize = 1024 // 1 KB

// Keep track of how much data we've processed
bytesRead := int64(0)
totalBytes := int64(len(in.Data))
chunkIndex := 0
buffer := make([]byte, chunkSize)

// Read and send data in chunks
for bytesRead < totalBytes {
// Determine size of the next chunk
end := bytesRead + chunkSize
if end > totalBytes {
end = totalBytes
for {
// Read a chunk from the file
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
a.logger.Error(ctx, "Failed to read file chunk", "chunkIndex", chunkIndex, "error", err)
return nil, fmt.Errorf("failed to read file chunk: %w", err)
}

// Prepare the chunk data
chunkData := in.Data[bytesRead:end]

// Create the chunk request
// Create the chunk request with just the bytes we read
chunk := &cascade.RegisterRequest{
RequestType: &cascade.RegisterRequest_Chunk{
Chunk: &cascade.DataChunk{
Data: chunkData,
Data: buffer[:n],
},
},
}
Expand All @@ -75,10 +93,10 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
return nil, fmt.Errorf("failed to send chunk: %w", err)
}

bytesRead += int64(len(chunkData))
bytesRead += int64(n)
progress := float64(bytesRead) / float64(totalBytes) * 100

a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", len(chunkData), "progress", fmt.Sprintf("%.1f%%", progress))
a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", n, "progress", fmt.Sprintf("%.1f%%", progress))

chunkIndex++
}
Expand Down Expand Up @@ -117,13 +135,7 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
}

// Log the streamed progress update
a.logger.Info(ctx, "Supernode progress update received",
"event_type", resp.EventType,
"message", resp.Message,
"tx_hash", resp.TxHash,
"task_id", in.TaskId,
"action_id", in.ActionID,
)
a.logger.Info(ctx, "Supernode progress update received", "event_type", resp.EventType, "message", resp.Message, "tx_hash", resp.TxHash, "task_id", in.TaskId, "action_id", in.ActionID)

if in.EventLogger != nil {
in.EventLogger(ctx, toSdkEvent(resp.EventType), resp.Message, nil)
Expand Down
2 changes: 1 addition & 1 deletion sdk/adapters/supernodeservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type LoggerFunc func(
)

type CascadeSupernodeRegisterRequest struct {
Data []byte
FilePath string
ActionID string
TaskId string
EventLogger LoggerFunc
Expand Down
8 changes: 4 additions & 4 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ const (

type CascadeTask struct {
BaseTask
data []byte
filePath string
actionId string
}

// NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters
func NewCascadeTask(base BaseTask, data []byte, actionId string) *CascadeTask {
func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTask {
return &CascadeTask{
BaseTask: base,
data: data,
filePath: filePath,
actionId: actionId,
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, factoryCfg)

req := &supernodeservice.CascadeSupernodeRegisterRequest{
Data: t.data,
FilePath: t.filePath,
ActionID: t.ActionID,
TaskId: t.TaskID,
}
Expand Down
Loading