Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/cascade/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cascade

const (
// SkipArtifactStorageHeader is propagated via gRPC metadata to indicate
// the supernode should bypass artifact persistence.
SkipArtifactStorageHeader = "x-lumera-skip-artifact-storage"
// SkipArtifactStorageHeaderValue marks the header value for opting out of storage.
SkipArtifactStorageHeaderValue = "true"
// LogFieldSkipStorage standardises the log key used when skip storage is triggered.
LogFieldSkipStorage = "skip_storage"
)

const (
// ArtifactStorageSkippedMessage is published when artifact persistence is skipped.
ArtifactStorageSkippedMessage = "Artifact storage skipped by client instruction"
)
94 changes: 53 additions & 41 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (

"github.com/LumeraProtocol/supernode/v2/gen/supernode"
"github.com/LumeraProtocol/supernode/v2/gen/supernode/action/cascade"
cascadecommon "github.com/LumeraProtocol/supernode/v2/pkg/cascade"
"github.com/LumeraProtocol/supernode/v2/sdk/event"
"github.com/LumeraProtocol/supernode/v2/sdk/log"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type cascadeAdapter struct {
Expand Down Expand Up @@ -85,8 +87,18 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
phaseCtx, cancel := context.WithCancel(baseCtx)
defer cancel()

// Check if we should skip artifact storage on retry
callCtx := phaseCtx
if in.SkipArtifactStorage {
md := metadata.Pairs(
cascadecommon.SkipArtifactStorageHeader,
cascadecommon.SkipArtifactStorageHeaderValue,
)
callCtx = metadata.NewOutgoingContext(callCtx, md)
}

// Create the client stream
stream, err := a.client.Register(phaseCtx, opts...)
stream, err := a.client.Register(callCtx, opts...)
if err != nil {
a.logger.Error(ctx, "Failed to create register stream", "error", err)
if in.EventLogger != nil {
Expand Down Expand Up @@ -395,9 +407,9 @@ func (a *cascadeAdapter) GetSupernodeStatus(ctx context.Context) (SupernodeStatu

// CascadeSupernodeDownload downloads a file from a supernode gRPC stream
func (a *cascadeAdapter) CascadeSupernodeDownload(
ctx context.Context,
in *CascadeSupernodeDownloadRequest,
opts ...grpc.CallOption,
ctx context.Context,
in *CascadeSupernodeDownloadRequest,
opts ...grpc.CallOption,
) (*CascadeSupernodeDownloadResponse, error) {

// Use provided context as-is (no correlation IDs)
Expand Down Expand Up @@ -426,11 +438,11 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
}
defer outFile.Close()

var (
bytesWritten int64
chunkIndex int
startedEmitted bool
)
var (
bytesWritten int64
chunkIndex int
startedEmitted bool
)

// 3. Receive streamed responses
for {
Expand All @@ -456,21 +468,21 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
})
}

// 3b. Actual data chunk
case *cascade.DownloadResponse_Chunk:
data := x.Chunk.Data
if len(data) == 0 {
continue
}
if !startedEmitted {
if in.EventLogger != nil {
in.EventLogger(ctx, event.SDKDownloadStarted, "Download started", event.EventData{event.KeyActionID: in.ActionID})
}
startedEmitted = true
}
if _, err := outFile.Write(data); err != nil {
return nil, fmt.Errorf("write chunk: %w", err)
}
// 3b. Actual data chunk
case *cascade.DownloadResponse_Chunk:
data := x.Chunk.Data
if len(data) == 0 {
continue
}
if !startedEmitted {
if in.EventLogger != nil {
in.EventLogger(ctx, event.SDKDownloadStarted, "Download started", event.EventData{event.KeyActionID: in.ActionID})
}
startedEmitted = true
}
if _, err := outFile.Write(data); err != nil {
return nil, fmt.Errorf("write chunk: %w", err)
}

bytesWritten += int64(len(data))
chunkIndex++
Expand All @@ -481,19 +493,19 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(

a.logger.Info(ctx, "download complete", "bytes_written", bytesWritten, "path", in.OutputPath, "action_id", in.ActionID)

if in.EventLogger != nil {
in.EventLogger(ctx, event.SDKDownloadCompleted, "Download completed", event.EventData{event.KeyActionID: in.ActionID, event.KeyOutputPath: in.OutputPath})
}
return &CascadeSupernodeDownloadResponse{
Success: true,
Message: "artefact downloaded",
OutputPath: in.OutputPath,
}, nil
if in.EventLogger != nil {
in.EventLogger(ctx, event.SDKDownloadCompleted, "Download completed", event.EventData{event.KeyActionID: in.ActionID, event.KeyOutputPath: in.OutputPath})
}
return &CascadeSupernodeDownloadResponse{
Success: true,
Message: "artifact downloaded",
OutputPath: in.OutputPath,
}, nil
}

// toSdkEvent converts a supernode-side enum value into an internal SDK EventType.
func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
switch e {
switch e {
case cascade.SupernodeEventType_ACTION_RETRIEVED:
return event.SupernodeActionRetrieved
case cascade.SupernodeEventType_ACTION_FEE_VERIFIED:
Expand All @@ -516,14 +528,14 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
return event.SupernodeArtefactsStored
case cascade.SupernodeEventType_ACTION_FINALIZED:
return event.SupernodeActionFinalized
case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED:
return event.SupernodeArtefactsDownloaded
case cascade.SupernodeEventType_NETWORK_RETRIEVE_STARTED:
return event.SupernodeNetworkRetrieveStarted
case cascade.SupernodeEventType_DECODE_COMPLETED:
return event.SupernodeDecodeCompleted
case cascade.SupernodeEventType_SERVE_READY:
return event.SupernodeServeReady
case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED:
return event.SupernodeArtefactsDownloaded
case cascade.SupernodeEventType_NETWORK_RETRIEVE_STARTED:
return event.SupernodeNetworkRetrieveStarted
case cascade.SupernodeEventType_DECODE_COMPLETED:
return event.SupernodeDecodeCompleted
case cascade.SupernodeEventType_SERVE_READY:
return event.SupernodeServeReady
case cascade.SupernodeEventType_FINALIZE_SIMULATED:
return event.SupernodeFinalizeSimulated
case cascade.SupernodeEventType_FINALIZE_SIMULATION_FAILED:
Expand Down
147 changes: 74 additions & 73 deletions sdk/adapters/supernodeservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type LoggerFunc func(
)

type CascadeSupernodeRegisterRequest struct {
FilePath string
ActionID string
TaskId string
EventLogger LoggerFunc
FilePath string
ActionID string
TaskId string
SkipArtifactStorage bool
EventLogger LoggerFunc
}

type CascadeSupernodeRegisterResponse struct {
Expand All @@ -45,75 +46,75 @@ type StorageInfo struct {
}

type SupernodeStatusresponse struct {
Version string // Supernode version
UptimeSeconds uint64 // Uptime in seconds
Resources struct {
CPU struct {
UsagePercent float64
Cores int32
}
Memory struct {
TotalGB float64
UsedGB float64
AvailableGB float64
UsagePercent float64
}
Storage []StorageInfo
HardwareSummary string // Formatted hardware summary
}
RunningTasks []ServiceTasks // Services with running tasks
RegisteredServices []string // All available service names
Network struct {
PeersCount int32 // Number of connected peers
PeerAddresses []string // List of peer addresses
}
Rank int32 // Rank in top supernodes list (0 if not in top list)
IPAddress string // Supernode IP address with port
// Optional detailed P2P metrics (present when requested)
P2PMetrics struct {
DhtMetrics struct {
StoreSuccessRecent []struct {
TimeUnix int64
Requests int32
Successful int32
SuccessRate float64
}
BatchRetrieveRecent []struct {
TimeUnix int64
Keys int32
Required int32
FoundLocal int32
FoundNetwork int32
DurationMS int64
}
HotPathBannedSkips int64
HotPathBanIncrements int64
}
NetworkHandleMetrics map[string]struct{
Total int64
Success int64
Failure int64
Timeout int64
}
ConnPoolMetrics map[string]int64
BanList []struct {
ID string
IP string
Port uint32
Count int32
CreatedAtUnix int64
AgeSeconds int64
}
Database struct {
P2PDBSizeMB float64
P2PDBRecordsCount int64
}
Disk struct {
AllMB float64
UsedMB float64
FreeMB float64
}
}
Version string // Supernode version
UptimeSeconds uint64 // Uptime in seconds
Resources struct {
CPU struct {
UsagePercent float64
Cores int32
}
Memory struct {
TotalGB float64
UsedGB float64
AvailableGB float64
UsagePercent float64
}
Storage []StorageInfo
HardwareSummary string // Formatted hardware summary
}
RunningTasks []ServiceTasks // Services with running tasks
RegisteredServices []string // All available service names
Network struct {
PeersCount int32 // Number of connected peers
PeerAddresses []string // List of peer addresses
}
Rank int32 // Rank in top supernodes list (0 if not in top list)
IPAddress string // Supernode IP address with port
// Optional detailed P2P metrics (present when requested)
P2PMetrics struct {
DhtMetrics struct {
StoreSuccessRecent []struct {
TimeUnix int64
Requests int32
Successful int32
SuccessRate float64
}
BatchRetrieveRecent []struct {
TimeUnix int64
Keys int32
Required int32
FoundLocal int32
FoundNetwork int32
DurationMS int64
}
HotPathBannedSkips int64
HotPathBanIncrements int64
}
NetworkHandleMetrics map[string]struct {
Total int64
Success int64
Failure int64
Timeout int64
}
ConnPoolMetrics map[string]int64
BanList []struct {
ID string
IP string
Port uint32
Count int32
CreatedAtUnix int64
AgeSeconds int64
}
Database struct {
P2PDBSizeMB float64
P2PDBRecordsCount int64
}
Disk struct {
AllMB float64
UsedMB float64
FreeMB float64
}
}
}
type CascadeSupernodeDownloadRequest struct {
ActionID string
Expand Down
40 changes: 23 additions & 17 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (

type CascadeTask struct {
BaseTask
filePath string
actionId string
filePath string
actionId string
skipArtifactStorage bool
}

// NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters
Expand Down Expand Up @@ -74,6 +75,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum

var lastErr error
for idx, sn := range supernodes {
req.SkipArtifactStorage = t.skipArtifactStorage
// 1
t.LogEvent(ctx, event.SDKRegistrationAttempt, "attempting registration with supernode", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
Expand Down Expand Up @@ -103,21 +105,25 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
}

func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error {
client, err := factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
}
defer client.Close(ctx)

// Emit connection established event for observability
t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
})

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
t.LogEvent(ctx, evt, msg, data)
}
client, err := factory.CreateClient(ctx, sn)
if err != nil {
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
}
defer client.Close(ctx)

// Emit connection established event for observability
t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
})

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
if evt == event.SupernodeArtefactsStored {
t.skipArtifactStorage = true
req.SkipArtifactStorage = true
}
t.LogEvent(ctx, evt, msg, data)
}
// Use ctx directly; per-phase timers are applied inside the adapter
resp, err := client.RegisterCascade(ctx, req)
if err != nil {
Expand Down
Loading