Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions gen/supernode/action/cascade/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 18 additions & 4 deletions pkg/codec/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package codec

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand All @@ -17,8 +18,8 @@ type DecodeRequest struct {
}

type DecodeResponse struct {
Path string
LayoutPath string
Path string
DecodeTmpDir string
}

func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) {
Expand Down Expand Up @@ -52,14 +53,27 @@ func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeRespons
}
logtrace.Info(ctx, "symbols written to disk", fields)

// ---------- write layout.json ---------- ←★
layoutPath := filepath.Join(symbolsDir, "layout.json")
layoutBytes, err := json.Marshal(req.Layout)
if err != nil {
fields[logtrace.FieldError] = err.Error()
return DecodeResponse{}, fmt.Errorf("marshal layout: %w", err)
}
if err := os.WriteFile(layoutPath, layoutBytes, 0o644); err != nil {
fields[logtrace.FieldError] = err.Error()
return DecodeResponse{}, fmt.Errorf("write layout file: %w", err)
}
logtrace.Info(ctx, "layout.json written", fields)

// Decode
outputPath := filepath.Join(symbolsDir, "output")
if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil {
if err := processor.DecodeSymbols(symbolsDir, outputPath, layoutPath); err != nil {
fields[logtrace.FieldError] = err.Error()
_ = os.Remove(outputPath)
return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err)
}

logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields)
return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil
return DecodeResponse{Path: outputPath, DecodeTmpDir: symbolsDir}, nil
}
2 changes: 1 addition & 1 deletion proto/supernode/action/cascade/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ enum SupernodeEventType {
RQID_VERIFIED = 9;
ARTEFACTS_STORED = 10;
ACTION_FINALIZED = 11;
Artefacts_Downloaded = 12;
ARTEFACTS_DOWNLOADED = 12;
}

message HealthCheckRequest {}
Expand Down
23 changes: 23 additions & 0 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Client interface {
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error
SubscribeToAllEvents(ctx context.Context, handler event.Handler) error
DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error)
}

// ClientImpl implements the Client interface
Expand Down Expand Up @@ -128,3 +129,25 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han

return nil
}

func (c *ClientImpl) DownloadCascade(
ctx context.Context,
actionID, outputPath string,
) (string, error) {

if actionID == "" {
return "", fmt.Errorf("actionID is empty")
}

taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath)
if err != nil {
return "", fmt.Errorf("create download task: %w", err)
}

c.logger.Info(ctx, "cascade download task created",
"task_id", taskID,
"action_id", actionID,
)

return taskID, nil
}
94 changes: 94 additions & 0 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,98 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
}, nil
}

// CascadeSupernodeDownload downloads a file from a supernode gRPC stream
func (a *cascadeAdapter) CascadeSupernodeDownload(
ctx context.Context,
in *CascadeSupernodeDownloadRequest,
opts ...grpc.CallOption,
) (*CascadeSupernodeDownloadResponse, error) {

ctx = net.AddCorrelationID(ctx)

// 1. Open gRPC stream (server-stream)
stream, err := a.client.Download(ctx, &cascade.DownloadRequest{
ActionId: in.ActionID,
}, opts...)
if err != nil {
a.logger.Error(ctx, "failed to create download stream",
"action_id", in.ActionID, "error", err)
return nil, err
}

// 2. Prepare destination file
outFile, err := os.Create(in.OutputPath)
if err != nil {
a.logger.Error(ctx, "failed to create output file",
"path", in.OutputPath, "error", err)
return nil, fmt.Errorf("create output file: %w", err)
}
defer outFile.Close()

var (
bytesWritten int64
chunkIndex int
)

// 3. Receive streamed responses
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("stream recv: %w", err)
}

switch x := resp.ResponseType.(type) {

// 3a. Progress / event message
case *cascade.DownloadResponse_Event:
a.logger.Info(ctx, "supernode event",
"event_type", x.Event.EventType,
"message", x.Event.Message,
"action_id", in.ActionID)

if in.EventLogger != nil {
in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{
event.KeyActionID: in.ActionID,
event.KeyEventType: x.Event.EventType,
event.KeyMessage: x.Event.Message,
})
}

// 3b. Actual data chunk
case *cascade.DownloadResponse_Chunk:
data := x.Chunk.Data
if len(data) == 0 {
continue
}
if _, err := outFile.Write(data); err != nil {
return nil, fmt.Errorf("write chunk: %w", err)
}

bytesWritten += int64(len(data))
chunkIndex++

a.logger.Debug(ctx, "received chunk",
"chunk_index", chunkIndex,
"chunk_size", len(data),
"bytes_written", bytesWritten)
}
}

a.logger.Info(ctx, "download complete",
"bytes_written", bytesWritten,
"path", in.OutputPath,
"action_id", in.ActionID)

return &CascadeSupernodeDownloadResponse{
Success: true,
Message: "artefact downloaded",
OutputPath: in.OutputPath,
}, nil
}

// toSdkEvent converts a supernode-side enum value into an internal SDK EventType.
func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
switch e {
Expand All @@ -189,6 +281,8 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
return event.SupernodeArtefactsStored
case cascade.SupernodeEventType_ACTION_FINALIZED:
return event.SupernodeActionFinalized
case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED:
return event.SupernodeArtefactsDownloaded
default:
return event.SupernodeUnknown
}
Expand Down
14 changes: 14 additions & 0 deletions sdk/adapters/supernodeservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ type CascadeSupernodeRegisterResponse struct {
TxHash string
}

type CascadeSupernodeDownloadRequest struct {
ActionID string
TaskID string
OutputPath string
EventLogger LoggerFunc
}

type CascadeSupernodeDownloadResponse struct {
Success bool
Message string
OutputPath string
}

//go:generate mockery --name=CascadeServiceClient --output=testutil/mocks --outpkg=mocks --filename=cascade_service_mock.go
type CascadeServiceClient interface {
CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error)
CascadeSupernodeDownload(ctx context.Context, in *CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*CascadeSupernodeDownloadResponse, error)
}
1 change: 1 addition & 0 deletions sdk/event/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
KeyMessage EventDataKey = "message"
KeyProgress EventDataKey = "progress"
KeyEventType EventDataKey = "event_type"
KeyOutputPath EventDataKey = "output_path"

// Task specific keys
KeyTaskID EventDataKey = "task_id"
Expand Down
30 changes: 18 additions & 12 deletions sdk/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ const (
SDKTaskTxHashReceived EventType = "sdk:txhash_received"
SDKTaskCompleted EventType = "sdk:completed"
SDKTaskFailed EventType = "sdk:failed"

SDKDownloadAttempt EventType = "sdk:download_attempt"
SDKDownloadFailure EventType = "sdk:download_failure"
SDKOutputPathReceived EventType = "sdk:output_path_received"
SDKDownloadSuccessful EventType = "sdk:download_successful"
)

const (
SupernodeActionRetrieved EventType = "supernode:action_retrieved"
SupernodeActionFeeVerified EventType = "supernode:action_fee_verified"
SupernodeTopCheckPassed EventType = "supernode:top_check_passed"
SupernodeMetadataDecoded EventType = "supernode:metadata_decoded"
SupernodeDataHashVerified EventType = "supernode:data_hash_verified"
SupernodeInputEncoded EventType = "supernode:input_encoded"
SupernodeSignatureVerified EventType = "supernode:signature_verified"
SupernodeRQIDGenerated EventType = "supernode:rqid_generated"
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
SupernodeActionFinalized EventType = "supernode:action_finalized"
SupernodeUnknown EventType = "supernode:unknown"
SupernodeActionRetrieved EventType = "supernode:action_retrieved"
SupernodeActionFeeVerified EventType = "supernode:action_fee_verified"
SupernodeTopCheckPassed EventType = "supernode:top_check_passed"
SupernodeMetadataDecoded EventType = "supernode:metadata_decoded"
SupernodeDataHashVerified EventType = "supernode:data_hash_verified"
SupernodeInputEncoded EventType = "supernode:input_encoded"
SupernodeSignatureVerified EventType = "supernode:signature_verified"
SupernodeRQIDGenerated EventType = "supernode:rqid_generated"
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
SupernodeActionFinalized EventType = "supernode:action_finalized"
SupernodeArtefactsDownloaded EventType = "supernode:artefacts_downloaded"
SupernodeUnknown EventType = "supernode:unknown"
)

// EventData is a map of event data attributes using standardized keys
Expand Down
3 changes: 3 additions & 0 deletions sdk/net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type SupernodeClient interface {
// HealthCheck performs a health check on the supernode
HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error)

// Download downloads the cascade action file
Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error)

// Close releases resources used by the client
Close(ctx context.Context) error
}
10 changes: 10 additions & 0 deletions sdk/net/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ func (c *supernodeClient) HealthCheck(ctx context.Context) (*grpc_health_v1.Heal
return resp, nil
}

// Download downloads the cascade action file
func (c *supernodeClient) Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error) {
resp, err := c.cascadeClient.CascadeSupernodeDownload(ctx, in, opts...)
if err != nil {
return nil, fmt.Errorf("get artefacts failed: %w", err)
}

return resp, nil
}

// Close closes the connection to the supernode
func (c *supernodeClient) Close(ctx context.Context) error {
if c.conn != nil {
Expand Down
Loading