From ffc0d33bbb0a7ddcc7821ac47cdb490223d19394 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Fri, 19 Sep 2025 15:32:55 +0500 Subject: [PATCH] Artifacts storage skipped if already stored on network --- pkg/cascade/constants.go | 16 ++ sdk/adapters/supernodeservice/adapter.go | 94 ++++++----- sdk/adapters/supernodeservice/types.go | 147 +++++++++--------- sdk/task/cascade.go | 40 +++-- .../server/cascade/cascade_action_server.go | 51 +++--- supernode/services/cascade/register.go | 30 ++-- 6 files changed, 216 insertions(+), 162 deletions(-) create mode 100644 pkg/cascade/constants.go diff --git a/pkg/cascade/constants.go b/pkg/cascade/constants.go new file mode 100644 index 00000000..1bee9824 --- /dev/null +++ b/pkg/cascade/constants.go @@ -0,0 +1,16 @@ +package cascade + +const ( + // SkipArtifactStorageHeader is propagated via gRPC metadata to indicate + // the supernode should bypass artifact persistence. + SkipArtifactStorageHeader = "x-lumera-skip-artifact-storage" + // SkipArtifactStorageHeaderValue marks the header value for opting out of storage. + SkipArtifactStorageHeaderValue = "true" + // LogFieldSkipStorage standardises the log key used when skip storage is triggered. + LogFieldSkipStorage = "skip_storage" +) + +const ( + // ArtifactStorageSkippedMessage is published when artifact persistence is skipped. + ArtifactStorageSkippedMessage = "Artifact storage skipped by client instruction" +) diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 3b6b61dc..afb1ac3d 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -12,10 +12,12 @@ import ( "github.com/LumeraProtocol/supernode/v2/gen/supernode" "github.com/LumeraProtocol/supernode/v2/gen/supernode/action/cascade" + cascadecommon "github.com/LumeraProtocol/supernode/v2/pkg/cascade" "github.com/LumeraProtocol/supernode/v2/sdk/event" "github.com/LumeraProtocol/supernode/v2/sdk/log" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type cascadeAdapter struct { @@ -85,8 +87,18 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca phaseCtx, cancel := context.WithCancel(baseCtx) defer cancel() + // Check if we should skip artifact storage on retry + callCtx := phaseCtx + if in.SkipArtifactStorage { + md := metadata.Pairs( + cascadecommon.SkipArtifactStorageHeader, + cascadecommon.SkipArtifactStorageHeaderValue, + ) + callCtx = metadata.NewOutgoingContext(callCtx, md) + } + // Create the client stream - stream, err := a.client.Register(phaseCtx, opts...) + stream, err := a.client.Register(callCtx, opts...) if err != nil { a.logger.Error(ctx, "Failed to create register stream", "error", err) if in.EventLogger != nil { @@ -395,9 +407,9 @@ func (a *cascadeAdapter) GetSupernodeStatus(ctx context.Context) (SupernodeStatu // CascadeSupernodeDownload downloads a file from a supernode gRPC stream func (a *cascadeAdapter) CascadeSupernodeDownload( - ctx context.Context, - in *CascadeSupernodeDownloadRequest, - opts ...grpc.CallOption, + ctx context.Context, + in *CascadeSupernodeDownloadRequest, + opts ...grpc.CallOption, ) (*CascadeSupernodeDownloadResponse, error) { // Use provided context as-is (no correlation IDs) @@ -426,11 +438,11 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( } defer outFile.Close() - var ( - bytesWritten int64 - chunkIndex int - startedEmitted bool - ) + var ( + bytesWritten int64 + chunkIndex int + startedEmitted bool + ) // 3. Receive streamed responses for { @@ -456,21 +468,21 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( }) } - // 3b. Actual data chunk - case *cascade.DownloadResponse_Chunk: - data := x.Chunk.Data - if len(data) == 0 { - continue - } - if !startedEmitted { - if in.EventLogger != nil { - in.EventLogger(ctx, event.SDKDownloadStarted, "Download started", event.EventData{event.KeyActionID: in.ActionID}) - } - startedEmitted = true - } - if _, err := outFile.Write(data); err != nil { - return nil, fmt.Errorf("write chunk: %w", err) - } + // 3b. Actual data chunk + case *cascade.DownloadResponse_Chunk: + data := x.Chunk.Data + if len(data) == 0 { + continue + } + if !startedEmitted { + if in.EventLogger != nil { + in.EventLogger(ctx, event.SDKDownloadStarted, "Download started", event.EventData{event.KeyActionID: in.ActionID}) + } + startedEmitted = true + } + if _, err := outFile.Write(data); err != nil { + return nil, fmt.Errorf("write chunk: %w", err) + } bytesWritten += int64(len(data)) chunkIndex++ @@ -481,19 +493,19 @@ func (a *cascadeAdapter) CascadeSupernodeDownload( a.logger.Info(ctx, "download complete", "bytes_written", bytesWritten, "path", in.OutputPath, "action_id", in.ActionID) - if in.EventLogger != nil { - in.EventLogger(ctx, event.SDKDownloadCompleted, "Download completed", event.EventData{event.KeyActionID: in.ActionID, event.KeyOutputPath: in.OutputPath}) - } - return &CascadeSupernodeDownloadResponse{ - Success: true, - Message: "artefact downloaded", - OutputPath: in.OutputPath, - }, nil + if in.EventLogger != nil { + in.EventLogger(ctx, event.SDKDownloadCompleted, "Download completed", event.EventData{event.KeyActionID: in.ActionID, event.KeyOutputPath: in.OutputPath}) + } + return &CascadeSupernodeDownloadResponse{ + Success: true, + Message: "artifact downloaded", + OutputPath: in.OutputPath, + }, nil } // toSdkEvent converts a supernode-side enum value into an internal SDK EventType. func toSdkEvent(e cascade.SupernodeEventType) event.EventType { - switch e { + switch e { case cascade.SupernodeEventType_ACTION_RETRIEVED: return event.SupernodeActionRetrieved case cascade.SupernodeEventType_ACTION_FEE_VERIFIED: @@ -516,14 +528,14 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType { return event.SupernodeArtefactsStored case cascade.SupernodeEventType_ACTION_FINALIZED: return event.SupernodeActionFinalized - case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED: - return event.SupernodeArtefactsDownloaded - case cascade.SupernodeEventType_NETWORK_RETRIEVE_STARTED: - return event.SupernodeNetworkRetrieveStarted - case cascade.SupernodeEventType_DECODE_COMPLETED: - return event.SupernodeDecodeCompleted - case cascade.SupernodeEventType_SERVE_READY: - return event.SupernodeServeReady + case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED: + return event.SupernodeArtefactsDownloaded + case cascade.SupernodeEventType_NETWORK_RETRIEVE_STARTED: + return event.SupernodeNetworkRetrieveStarted + case cascade.SupernodeEventType_DECODE_COMPLETED: + return event.SupernodeDecodeCompleted + case cascade.SupernodeEventType_SERVE_READY: + return event.SupernodeServeReady case cascade.SupernodeEventType_FINALIZE_SIMULATED: return event.SupernodeFinalizeSimulated case cascade.SupernodeEventType_FINALIZE_SIMULATION_FAILED: diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index aa76e70d..36a06271 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -16,10 +16,11 @@ type LoggerFunc func( ) type CascadeSupernodeRegisterRequest struct { - FilePath string - ActionID string - TaskId string - EventLogger LoggerFunc + FilePath string + ActionID string + TaskId string + SkipArtifactStorage bool + EventLogger LoggerFunc } type CascadeSupernodeRegisterResponse struct { @@ -45,75 +46,75 @@ type StorageInfo struct { } type SupernodeStatusresponse struct { - Version string // Supernode version - UptimeSeconds uint64 // Uptime in seconds - Resources struct { - CPU struct { - UsagePercent float64 - Cores int32 - } - Memory struct { - TotalGB float64 - UsedGB float64 - AvailableGB float64 - UsagePercent float64 - } - Storage []StorageInfo - HardwareSummary string // Formatted hardware summary - } - RunningTasks []ServiceTasks // Services with running tasks - RegisteredServices []string // All available service names - Network struct { - PeersCount int32 // Number of connected peers - PeerAddresses []string // List of peer addresses - } - Rank int32 // Rank in top supernodes list (0 if not in top list) - IPAddress string // Supernode IP address with port - // Optional detailed P2P metrics (present when requested) - P2PMetrics struct { - DhtMetrics struct { - StoreSuccessRecent []struct { - TimeUnix int64 - Requests int32 - Successful int32 - SuccessRate float64 - } - BatchRetrieveRecent []struct { - TimeUnix int64 - Keys int32 - Required int32 - FoundLocal int32 - FoundNetwork int32 - DurationMS int64 - } - HotPathBannedSkips int64 - HotPathBanIncrements int64 - } - NetworkHandleMetrics map[string]struct{ - Total int64 - Success int64 - Failure int64 - Timeout int64 - } - ConnPoolMetrics map[string]int64 - BanList []struct { - ID string - IP string - Port uint32 - Count int32 - CreatedAtUnix int64 - AgeSeconds int64 - } - Database struct { - P2PDBSizeMB float64 - P2PDBRecordsCount int64 - } - Disk struct { - AllMB float64 - UsedMB float64 - FreeMB float64 - } - } + Version string // Supernode version + UptimeSeconds uint64 // Uptime in seconds + Resources struct { + CPU struct { + UsagePercent float64 + Cores int32 + } + Memory struct { + TotalGB float64 + UsedGB float64 + AvailableGB float64 + UsagePercent float64 + } + Storage []StorageInfo + HardwareSummary string // Formatted hardware summary + } + RunningTasks []ServiceTasks // Services with running tasks + RegisteredServices []string // All available service names + Network struct { + PeersCount int32 // Number of connected peers + PeerAddresses []string // List of peer addresses + } + Rank int32 // Rank in top supernodes list (0 if not in top list) + IPAddress string // Supernode IP address with port + // Optional detailed P2P metrics (present when requested) + P2PMetrics struct { + DhtMetrics struct { + StoreSuccessRecent []struct { + TimeUnix int64 + Requests int32 + Successful int32 + SuccessRate float64 + } + BatchRetrieveRecent []struct { + TimeUnix int64 + Keys int32 + Required int32 + FoundLocal int32 + FoundNetwork int32 + DurationMS int64 + } + HotPathBannedSkips int64 + HotPathBanIncrements int64 + } + NetworkHandleMetrics map[string]struct { + Total int64 + Success int64 + Failure int64 + Timeout int64 + } + ConnPoolMetrics map[string]int64 + BanList []struct { + ID string + IP string + Port uint32 + Count int32 + CreatedAtUnix int64 + AgeSeconds int64 + } + Database struct { + P2PDBSizeMB float64 + P2PDBRecordsCount int64 + } + Disk struct { + AllMB float64 + UsedMB float64 + FreeMB float64 + } + } } type CascadeSupernodeDownloadRequest struct { ActionID string diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index cabb0db1..4e25fc06 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -14,8 +14,9 @@ import ( type CascadeTask struct { BaseTask - filePath string - actionId string + filePath string + actionId string + skipArtifactStorage bool } // NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters @@ -74,6 +75,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum var lastErr error for idx, sn := range supernodes { + req.SkipArtifactStorage = t.skipArtifactStorage // 1 t.LogEvent(ctx, event.SDKRegistrationAttempt, "attempting registration with supernode", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, @@ -103,21 +105,25 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum } func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { - client, err := factory.CreateClient(ctx, sn) - if err != nil { - return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) - } - defer client.Close(ctx) - - // Emit connection established event for observability - t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - }) - - req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { - t.LogEvent(ctx, evt, msg, data) - } + client, err := factory.CreateClient(ctx, sn) + if err != nil { + return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) + } + defer client.Close(ctx) + + // Emit connection established event for observability + t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + }) + + req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { + if evt == event.SupernodeArtefactsStored { + t.skipArtifactStorage = true + req.SkipArtifactStorage = true + } + t.LogEvent(ctx, evt, msg, data) + } // Use ctx directly; per-phase timers are applied inside the adapter resp, err := client.RegisterCascade(ctx, req) if err != nil { diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index b6b6112c..c3255614 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -7,11 +7,13 @@ import ( "os" pb "github.com/LumeraProtocol/supernode/v2/gen/supernode/action/cascade" + cascadecommon "github.com/LumeraProtocol/supernode/v2/pkg/cascade" "github.com/LumeraProtocol/supernode/v2/pkg/errors" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/services/cascade" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type ActionServer struct { @@ -74,6 +76,12 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er } ctx := stream.Context() + skipStorage := false + if md, ok := metadata.FromIncomingContext(ctx); ok { + if values := md.Get(cascadecommon.SkipArtifactStorageHeader); len(values) > 0 && values[0] == cascadecommon.SkipArtifactStorageHeaderValue { + skipStorage = true + } + } logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields) const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit @@ -185,12 +193,15 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er // Process the complete data task := server.factory.NewCascadeRegistrationTask() + fields[cascadecommon.LogFieldSkipStorage] = skipStorage + err = task.Register(ctx, &cascadeService.RegisterRequest{ - TaskID: metadata.TaskId, - ActionID: metadata.ActionId, - DataHash: hash, - DataSize: totalSize, - FilePath: targetPath, + TaskID: metadata.TaskId, + ActionID: metadata.ActionId, + DataHash: hash, + DataSize: totalSize, + FilePath: targetPath, + SkipArtifactStorage: skipStorage, }, func(resp *cascadeService.RegisterResponse) error { grpcResp := &pb.RegisterResponse{ EventType: pb.SupernodeEventType(resp.EventType), @@ -308,21 +319,21 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS "chunk_size": chunkSize, }) - // Announce: file is ready to be served to the client - if err := stream.Send(&pb.DownloadResponse{ - ResponseType: &pb.DownloadResponse_Event{ - Event: &pb.DownloadEvent{ - EventType: pb.SupernodeEventType_SERVE_READY, - Message: "File available for download", - }, - }, - }); err != nil { - logtrace.Error(ctx, "failed to send serve-ready event", logtrace.Fields{logtrace.FieldError: err.Error()}) - return err - } - - // Split and stream the file using adaptive chunk size - for i := 0; i < len(restoredFile); i += chunkSize { + // Announce: file is ready to be served to the client + if err := stream.Send(&pb.DownloadResponse{ + ResponseType: &pb.DownloadResponse_Event{ + Event: &pb.DownloadEvent{ + EventType: pb.SupernodeEventType_SERVE_READY, + Message: "File available for download", + }, + }, + }); err != nil { + logtrace.Error(ctx, "failed to send serve-ready event", logtrace.Fields{logtrace.FieldError: err.Error()}) + return err + } + + // Split and stream the file using adaptive chunk size + for i := 0; i < len(restoredFile); i += chunkSize { end := i + chunkSize if end > len(restoredFile) { end = len(restoredFile) diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index 403d0428..89609a08 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -4,17 +4,19 @@ import ( "context" "os" + cascadecommon "github.com/LumeraProtocol/supernode/v2/pkg/cascade" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) // RegisterRequest contains parameters for upload request type RegisterRequest struct { - TaskID string - ActionID string - DataHash []byte - DataSize int - FilePath string + TaskID string + ActionID string + DataHash []byte + DataSize int + FilePath string + SkipArtifactStorage bool } // RegisterResponse contains the result of upload @@ -157,18 +159,24 @@ func (task *CascadeRegistrationTask) Register( task.streamEvent(SupernodeEventTypeFinalizeSimulated, "Finalize simulation passed", "", send) /* 11. Persist artefacts -------------------------------------------------------- */ - // Persist artefacts to the P2P network. + // Persist artefacts to the P2P network unless explicitly skipped by the client. // Aggregation model (context): // - Each underlying StoreBatch returns (ratePct, requests) where requests is // the number of node RPCs. The aggregated success rate can be computed as a // weighted average by requests across metadata and symbol batches, yielding // an overall network success view for the action. - metrics, err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields) - if err != nil { - return err + if req.SkipArtifactStorage { + fields[cascadecommon.LogFieldSkipStorage] = true + logtrace.Info(ctx, "Artifact storage skipped at client request", fields) + task.streamEvent(SupernodeEventTypeArtefactsStored, cascadecommon.ArtifactStorageSkippedMessage, "", send) + } else { + metrics, err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields) + if err != nil { + return err + } + // Emit single-line metrics via helper to keep Register clean + task.emitArtefactsStored(ctx, metrics, fields, send) } - // Emit single-line metrics via helper to keep Register clean - task.emitArtefactsStored(ctx, metrics, fields, send) resp, err := task.LumeraClient.FinalizeAction(ctx, action.ActionID, rqidResp.RQIDs) if err != nil {