Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ go.work.sum
# env file
.env
/data
/tests/system/data
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/main.go",
"program": "${workspaceFolder}/supernode/main.go",
"env": {},
"args": [],
"args": ["start"],
"showLog": true
}
]
Expand Down
110 changes: 110 additions & 0 deletions actionsdk/action/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package action

import (
"context"
"fmt"

"action/config"
"action/event"
"action/task"

"github.com/LumeraProtocol/supernode/pkg/lumera"
)

type ActionClient interface {
// StartSense initiates a Sense operation and returns a unique task ID
StartSense(ctx context.Context, fileHash string, actionID string, filePath string) (string, error)

// StartCascade initiates a Cascade operation and returns a unique task ID
StartCascade(ctx context.Context, fileHash string, actionID string, filePath string) (string, error)

// SubscribeToEvents registers a handler for specific event types
SubscribeToEvents(eventType event.EventType, handler event.Handler)

// SubscribeToAllEvents registers a handler for all events
SubscribeToAllEvents(handler event.Handler)
}

type ActionClientImpl struct {
lumeraClient lumera.Client
config config.Config
taskManager task.Manager
}

func NewActionClient(lumeraClient lumera.Client, config config.Config) *ActionClientImpl {
// Create task manager with config
taskManager := task.NewManager(lumeraClient, config)

return &ActionClientImpl{
lumeraClient: lumeraClient,
config: config,
taskManager: taskManager,
}
}

// StartSense initiates a Sense operation
func (ac *ActionClientImpl) StartSense(
ctx context.Context,
fileHash string,
actionID string,
filePath string,
) (string, error) {

if fileHash == "" {
return "", ErrEmptyFileHash
}
if actionID == "" {
return "", ErrEmptyActionID
}
if filePath == "" {
return "", ErrEmptyFilePath
}

// Create and start the task
taskID, err := ac.taskManager.CreateSenseTask(ctx, fileHash, actionID, filePath)
if err != nil {
return "", fmt.Errorf("failed to create sense task: %w", err)
}

return taskID, nil
}

func (ac *ActionClientImpl) StartCascade(
ctx context.Context,
fileHash string,
actionID string,
filePath string,
) (string, error) {

if fileHash == "" {
return "", ErrEmptyFileHash
}
if actionID == "" {
return "", ErrEmptyActionID
}
if filePath == "" {
return "", ErrEmptyFilePath
}

// Create and start the task
taskID, err := ac.taskManager.CreateCascadeTask(ctx, fileHash, actionID, filePath)
if err != nil {
return "", fmt.Errorf("failed to create cascade task: %w", err)
}

return taskID, nil
}

// SubscribeToEvents registers a handler for specific event types
func (ac *ActionClientImpl) SubscribeToEvents(eventType event.EventType, handler event.Handler) {
if ac.config.EventBus != nil {
ac.config.EventBus.Subscribe(eventType, handler)
}
}

// SubscribeToAllEvents registers a handler for all events
func (ac *ActionClientImpl) SubscribeToAllEvents(handler event.Handler) {
if ac.config.EventBus != nil {
ac.config.EventBus.SubscribeAll(handler)
}
}
51 changes: 51 additions & 0 deletions actionsdk/action/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package action

import (
"errors"
"fmt"
)

var (
ErrEmptyFileHash = errors.New("file hash cannot be empty")
ErrEmptyActionID = errors.New("action ID cannot be empty")
ErrEmptyFilePath = errors.New("file path cannot be empty")
ErrNoValidAction = errors.New("no action found with the specified ID")
ErrInvalidAction = errors.New("action is not in a valid state")
ErrNoSupernodes = errors.New("no valid supernodes available")
ErrTaskCreation = errors.New("failed to create task")
ErrCommunication = errors.New("communication with supernode failed")
)

// SupernodeError represents an error related to supernode operations
type SupernodeError struct {
NodeID string
Message string
Err error
}

// Error returns the error message
func (e *SupernodeError) Error() string {
return fmt.Sprintf("supernode error (ID: %s): %s: %v", e.NodeID, e.Message, e.Err)
}

// Unwrap returns the underlying error
func (e *SupernodeError) Unwrap() error {
return e.Err
}

// ActionError represents an error related to action operations
type ActionError struct {
ActionID string
Message string
Err error
}

// Error returns the error message
func (e *ActionError) Error() string {
return fmt.Sprintf("action error (ID: %s): %s: %v", e.ActionID, e.Message, e.Err)
}

// Unwrap returns the underlying error
func (e *ActionError) Unwrap() error {
return e.Err
}
8 changes: 8 additions & 0 deletions actionsdk/action/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package action

type TaskType string

const (
TaskTypeSense TaskType = "SENSE"
TaskTypeCascade TaskType = "CASCADE"
)
91 changes: 91 additions & 0 deletions actionsdk/adapters/lumera/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package lumera

import (
"context"
"fmt"

"github.com/LumeraProtocol/lumera/x/action/types"
sntypes "github.com/LumeraProtocol/lumera/x/supernode/types"
lumeraclient "github.com/LumeraProtocol/supernode/pkg/lumera"
)

type Client interface {
GetAction(ctx context.Context, actionID string) (Action, error)
GetSupernodes(ctx context.Context, height int64) ([]Supernode, error)
}

// Adapter adapts the lumera.Client to our Client interface
type Adapter struct {
client lumeraclient.Client
}

// NewAdapter creates a new adapter for the lumera.Client
func NewAdapter(client lumeraclient.Client) Client {
return &Adapter{
client: client,
}
}

// GetAction retrieves action information from the blockchain
func (a *Adapter) GetAction(ctx context.Context, actionID string) (Action, error) {
resp, err := a.client.Action().GetAction(ctx, actionID)
if err != nil {
return Action{}, fmt.Errorf("failed to get action: %w", err)
}

// Transform the response to our simplified Action type
return toSdkAction(resp), nil
}

// GetSupernodes retrieves a list of top supernodes at a given height
func (a *Adapter) GetSupernodes(ctx context.Context, height int64) ([]Supernode, error) {
resp, err := a.client.SuperNode().GetTopSuperNodesForBlock(ctx, uint64(height))
if err != nil {
return nil, fmt.Errorf("failed to get supernodes: %w", err)
}

// Transform the response to our simplified Supernode type
return toSdkSupernodes(resp), nil
}

func toSdkAction(resp *types.QueryGetActionResponse) Action {
return Action{
ID: resp.Action.ActionID,
State: ACTION_STATE(resp.Action.State),
Height: int64(resp.Action.BlockHeight),
ExpirationTime: resp.Action.ExpirationTime,
}
}

func toSdkSupernodes(resp *sntypes.QueryGetTopSuperNodesForBlockResponse) []Supernode {
var result []Supernode
for _, sn := range resp.Supernodes {
ipAddress, err := getLatestIP(sn)
if err != nil {
continue
}

if sn.SupernodeAccount == "" {
continue
}

if sn.States[0].State.String() != string(SUPERNODE_STATE_ACTIVE) {
continue
}

result = append(result, Supernode{
CosmosAddress: sn.SupernodeAccount,
GrpcEndpoint: ipAddress,
State: SUPERNODE_STATE_ACTIVE,
})
}
return result
}

func getLatestIP(supernode *sntypes.SuperNode) (string, error) {
if len(supernode.PrevIpAddresses) == 0 {
return "", fmt.Errorf("no ip history exists for the supernode")
}

return supernode.PrevIpAddresses[0].Address, nil
}
39 changes: 39 additions & 0 deletions actionsdk/adapters/lumera/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package lumera

// ACTION_STATE represents the possible states of an action
type ACTION_STATE string

const (
ACTION_STATE_UNSPECIFIED ACTION_STATE = "ACTION_STATE_UNSPECIFIED"
ACTION_STATE_PENDING ACTION_STATE = "ACTION_STATE_PENDING"
ACTION_STATE_DONE ACTION_STATE = "ACTION_STATE_DONE"
ACTION_STATE_APPROVED ACTION_STATE = "ACTION_STATE_APPROVED"
ACTION_STATE_REJECTED ACTION_STATE = "ACTION_STATE_REJECTED"
ACTION_STATE_FAILED ACTION_STATE = "ACTION_STATE_FAILED"
)

// SUPERNODE_STATE represents the possible states of a supernode
type SUPERNODE_STATE string

const (
SUPERNODE_STATE_UNSPECIFIED SUPERNODE_STATE = "SUPERNODE_STATE_UNSPECIFIED"
SUPERNODE_STATE_ACTIVE SUPERNODE_STATE = "SUPERNODE_STATE_ACTIVE"
SUPERNODE_STATE_DISABLED SUPERNODE_STATE = "SUPERNODE_STATE_DISABLED"
SUPERNODE_STATE_STOPPED SUPERNODE_STATE = "SUPERNODE_STATE_STOPPED"
SUPERNODE_STATE_PENALIZED SUPERNODE_STATE = "SUPERNODE_STATE_PENALIZED"
)

// Action represents an action registered on the Lumera blockchain
type Action struct {
ID string
State ACTION_STATE
Height int64
ExpirationTime string
}

// Supernode represents information about a supernode in the network
type Supernode struct {
CosmosAddress string // Blockchain identity of the supernode
GrpcEndpoint string // Network endpoint for gRPC communication
State SUPERNODE_STATE // Current state of the supernode
}
41 changes: 41 additions & 0 deletions actionsdk/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import (
"action/event"

"github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
)

// Config holds configuration values for the ActionClient
type Config struct {
// Security configuration
Keyring keyring.Keyring // Keyring containing identity keys
LocalCosmosAddress string // Local cosmos address for authentication
LocalPeerType securekeyx.PeerType // Local peer type (Simplenode for clients)

// Network configuration
DefaultSupernodePort int // Default port for supernode gRPC endpoints

// Task configuration
MaxRetries int // Maximum number of retries for supernode communication
TimeoutSeconds int // Timeout for supernode communication in seconds
SenseSupernodeCount int // Number of supernodes to select for Sense operations
CascadeSupernodeCount int // Number of supernodes to select for Cascade operations

// Event system
EventBus *event.Bus // Event bus for task events
}

// DefaultConfig returns a Config with default values
func DefaultConfig() Config {
return Config{
LocalPeerType: securekeyx.Simplenode,
DefaultSupernodePort: 50051,
MaxRetries: 3,
TimeoutSeconds: 30,
SenseSupernodeCount: 3,
CascadeSupernodeCount: 1,
EventBus: event.NewBus(),
}
}
Loading