Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,7 @@ func NewClient(ctx context.Context, config config.Config, logger log.Logger, key
}

// StartCascade initiates a cascade operation
func (c *ClientImpl) StartCascade(ctx context.Context,
fileHash string,
actionID string,
filePath string,
signedData string,
) (string, error) {
c.logger.Debug(ctx, "Starting cascade operation",
"fileHash", fileHash,
"actionID", actionID,
"filePath", filePath,
)

func (c *ClientImpl) StartCascade(ctx context.Context, fileHash string, actionID string, filePath string, signedData string) (string, error) {
if fileHash == "" {
c.logger.Error(ctx, "Empty file hash provided")
return "", ErrEmptyFileHash
Expand All @@ -83,7 +72,7 @@ func (c *ClientImpl) StartCascade(ctx context.Context,
return "", ErrEmptyFileNotFound
}

taskID, err := c.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath, signedData)
taskID, err := c.taskManager.CreateCascadeTask(ctx, actionID, filePath)
if err != nil {
c.logger.Error(ctx, "Failed to create cascade task", "error", err)
return "", fmt.Errorf("failed to create cascade task: %w", err)
Expand Down
60 changes: 16 additions & 44 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,13 @@ func NewCascadeAdapter(ctx context.Context, client cascade.CascadeServiceClient,
}
}

func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error) {
a.logger.Debug(ctx, "Uploading input data through adapter",
"filename", in.Filename,
"actionID", in.ActionID,
"filePath", in.FilePath)
func (a *cascadeAdapter) RegisterCascade(ctx context.Context, in *RegisterCascadeRequest, opts ...grpc.CallOption) (*RegisterCascadeResponse, error) {
a.logger.Debug(ctx, "RegisterCascade through adapter", "task_id", in.TaskID, "actionID", in.ActionID, "filePath", in.FilePath)

// Open the file for reading
file, err := os.Open(in.FilePath)
if err != nil {
a.logger.Error(ctx, "Failed to open file for reading",
"filePath", in.FilePath,
"error", err)
return nil, fmt.Errorf("failed to open file: %w", err)
}
Expand All @@ -50,18 +46,16 @@ func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDat
fileInfo, err := file.Stat()
if err != nil {
a.logger.Error(ctx, "Failed to get file stats",
"filePath", in.FilePath,
"error", err)
return nil, fmt.Errorf("failed to get file stats: %w", err)
}

fileSize := fileInfo.Size()
a.logger.Debug(ctx, "File opened for streaming",
"filePath", in.FilePath,
"fileSize", fileSize)

// Create the client stream
stream, err := a.client.UploadInputData(ctx, opts...)
stream, err := a.client.Register(ctx, opts...)
if err != nil {
a.logger.Error(ctx, "Failed to create upload stream",
"error", err)
Expand Down Expand Up @@ -90,77 +84,55 @@ func (a *cascadeAdapter) UploadInputData(ctx context.Context, in *UploadInputDat
}

// Only send what was actually read
chunk := &cascade.UploadInputDataRequest{
RequestType: &cascade.UploadInputDataRequest_Chunk{
chunk := &cascade.RegisterRequest{
RequestType: &cascade.RegisterRequest_Chunk{
Chunk: &cascade.DataChunk{
Data: buffer[:n],
},
},
}

if err := stream.Send(chunk); err != nil {
a.logger.Error(ctx, "Failed to send data chunk",
"chunkIndex", chunkIndex,
"error", err)
a.logger.Error(ctx, "Failed to send data chunk", "chunkIndex", chunkIndex, "error", err)
return nil, fmt.Errorf("failed to send chunk: %w", err)
}

bytesRead += int64(n)
progress := float64(bytesRead) / float64(fileSize) * 100

a.logger.Debug(ctx, "Sent data chunk",
"chunkIndex", chunkIndex,
"chunkSize", n,
"progress", fmt.Sprintf("%.1f%%", progress))
a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", n, "progress", fmt.Sprintf("%.1f%%", progress))

chunkIndex++
}

// Send metadata as the final message
metadata := &cascade.UploadInputDataRequest{
RequestType: &cascade.UploadInputDataRequest_Metadata{
metadata := &cascade.RegisterRequest{
RequestType: &cascade.RegisterRequest_Metadata{
Metadata: &cascade.Metadata{
Filename: in.Filename,
ActionId: in.ActionID,
DataHash: in.DataHash,
RqMax: in.RqMax,
SignedData: in.SignedData,
TaskId: in.TaskID,
ActionId: in.ActionID,
},
},
}

if err := stream.Send(metadata); err != nil {
a.logger.Error(ctx, "Failed to send metadata",
"filename", in.Filename,
"actionID", in.ActionID,
"error", err)
a.logger.Error(ctx, "Failed to send metadata", "task_id", in.TaskID, "actionID", in.ActionID, "error", err)
return nil, fmt.Errorf("failed to send metadata: %w", err)
}

a.logger.Debug(ctx, "Sent metadata",
"filename", in.Filename,
"actionID", in.ActionID)

resp, err := stream.CloseAndRecv()
if err != nil {
a.logger.Error(ctx, "Failed to close stream and receive response",
"filename", in.Filename,
"actionID", in.ActionID,
"error", err)
a.logger.Error(ctx, "Failed to close stream and receive response", "task_id", in.TaskID, "actionID", in.ActionID, "error", err)
return nil, fmt.Errorf("failed to receive response: %w", err)
}

response := &UploadInputDataResponse{
response := &RegisterCascadeResponse{
Success: resp.Success,
Message: resp.Message,
}

a.logger.Info(ctx, "Successfully uploaded input data",
"filename", in.Filename,
"actionID", in.ActionID,
"fileSize", fileSize,
"success", resp.Success,
"message", resp.Message)
a.logger.Info(ctx, "Successfully Registered with Supernode", "task_id", in.TaskID, "actionID", in.ActionID, "fileSize", fileSize,
"success", resp.Success, "message", resp.Message)

return response, nil
}
15 changes: 6 additions & 9 deletions sdk/adapters/supernodeservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ import (
"google.golang.org/grpc"
)

type UploadInputDataRequest struct {
Filename string
ActionID string
DataHash string
SignedData string
RqMax int32
FilePath string
type RegisterCascadeRequest struct {
ActionID string
TaskID string
FilePath string
}

type UploadInputDataResponse struct {
type RegisterCascadeResponse struct {
Success bool
Message string
}

type CascadeServiceClient interface {
UploadInputData(ctx context.Context, in *UploadInputDataRequest, opts ...grpc.CallOption) (*UploadInputDataResponse, error)
RegisterCascade(ctx context.Context, in *RegisterCascadeRequest, opts ...grpc.CallOption) (*RegisterCascadeResponse, error)
}
4 changes: 2 additions & 2 deletions sdk/net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

// SupernodeClient defines the interface for communicating with supernodes
type SupernodeClient interface {
// UploadInputData uploads input data for cascade processing
UploadInputData(ctx context.Context, in *supernodeservice.UploadInputDataRequest, opts ...grpc.CallOption) (*supernodeservice.UploadInputDataResponse, error)
//RegisterCascade uploads input data to Supernode for processing cascade request
RegisterCascade(ctx context.Context, in *supernodeservice.RegisterCascadeRequest, opts ...grpc.CallOption) (*supernodeservice.RegisterCascadeResponse, error)

// HealthCheck performs a health check on the supernode
HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error)
Expand Down
12 changes: 6 additions & 6 deletions sdk/net/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring.
}, nil
}

// UploadInputData sends data to the supernode for cascade processing
func (c *supernodeClient) UploadInputData(ctx context.Context,
in *supernodeservice.UploadInputDataRequest, opts ...grpc.CallOption,
) (*supernodeservice.UploadInputDataResponse, error) {
// RegisterCascade sends data to the supernode for cascade processing
func (c *supernodeClient) RegisterCascade(ctx context.Context,
in *supernodeservice.RegisterCascadeRequest, opts ...grpc.CallOption,
) (*supernodeservice.RegisterCascadeResponse, error) {

resp, err := c.cascadeClient.UploadInputData(ctx, in, opts...)
resp, err := c.cascadeClient.RegisterCascade(ctx, in, opts...)
if err != nil {
return nil, fmt.Errorf("upload input data failed: %w", err)
}

c.logger.Info(ctx, "Input data uploaded successfully",
"actionID", in.ActionID, "filename", in.Filename, "filePath", in.FilePath)
"actionID", in.ActionID, "filePath", in.FilePath)

return resp, nil
}
Expand Down
37 changes: 11 additions & 26 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

Expand All @@ -24,23 +23,14 @@ const (

type CascadeTask struct {
BaseTask
FileHash string
FilePath string
SignedData string
FilePath string
}

// NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters
func NewCascadeTask(
base BaseTask,
fileHash string,
filePath string,
signedData string,
) *CascadeTask {
func NewCascadeTask(base BaseTask, filePath string) *CascadeTask {
return &CascadeTask{
BaseTask: base,
FileHash: fileHash,
FilePath: filePath,
SignedData: signedData,
BaseTask: base,
FilePath: filePath,
}
}

Expand Down Expand Up @@ -155,12 +145,10 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
}
clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, factoryCfg)

req := &supernodeservice.UploadInputDataRequest{
Filename: filepath.Base(t.FilePath),
ActionID: t.ActionID,
DataHash: t.FileHash,
SignedData: t.SignedData,
FilePath: t.FilePath,
req := &supernodeservice.RegisterCascadeRequest{
ActionID: t.ActionID,
FilePath: t.FilePath,
TaskID: t.TaskID,
}

var lastErr error
Expand All @@ -175,13 +163,10 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
return fmt.Errorf("failed to upload to all supernodes: %w", lastErr)
}
func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode,
factory *net.ClientFactory, req *supernodeservice.UploadInputDataRequest) error {
factory *net.ClientFactory, req *supernodeservice.RegisterCascadeRequest) error {

t.logEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", map[string]interface{}{
"supernode": sn.GrpcEndpoint,
"sn-address": sn.CosmosAddress,
"iteration": index + 1,
})
"supernode": sn.GrpcEndpoint, "sn-address": sn.CosmosAddress, "iteration": index + 1})

client, err := factory.CreateClient(ctx, sn)
if err != nil {
Expand All @@ -192,7 +177,7 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum
uploadCtx, cancel := context.WithTimeout(ctx, registrationTimeout)
defer cancel()

resp, err := client.UploadInputData(uploadCtx, req)
resp, err := client.RegisterCascade(uploadCtx, req)
if err != nil {
return fmt.Errorf("upload to %s: %w", sn.CosmosAddress, err)
}
Expand Down
16 changes: 4 additions & 12 deletions sdk/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const MAX_EVENT_WORKERS = 100

// Manager handles task creation and management
type Manager interface {
CreateCascadeTask(ctx context.Context, fileHash, actionID, filePath string, signedData string) (string, error)
CreateCascadeTask(ctx context.Context, actionID, filePath string) (string, error)
GetTask(ctx context.Context, taskID string) (*TaskEntry, bool)
DeleteTask(ctx context.Context, taskID string) error
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler)
Expand Down Expand Up @@ -77,16 +77,8 @@ func NewManager(
}

// CreateCascadeTask creates and starts a Cascade task using the new pattern
func (m *ManagerImpl) CreateCascadeTask(
ctx context.Context,
fileHash string,
actionID string,
filePath string,
signedData string,
) (string, error) {
m.logger.Info(ctx, "Creating cascade task",
"fileHash", fileHash,
"actionID", actionID)
func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, actionID string, filePath string) (string, error) {
m.logger.Info(ctx, "Creating cascade task", "filePath", filePath, "actionID", actionID)

// Generate task ID
// slice this to 8 bytes
Expand All @@ -105,7 +97,7 @@ func (m *ManagerImpl) CreateCascadeTask(
logger: m.logger,
}
// Create cascade-specific task
task := NewCascadeTask(baseTask, fileHash, filePath, signedData)
task := NewCascadeTask(baseTask, filePath)

// Store task in cache
m.taskCache.Set(ctx, taskID, task, TaskTypeCascade)
Expand Down
Loading