diff --git a/supernode/adaptors/p2p.go b/supernode/adaptors/p2p.go index 036ec24a..e9a60b7b 100644 --- a/supernode/adaptors/p2p.go +++ b/supernode/adaptors/p2p.go @@ -134,6 +134,28 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action totalBytesStored := 0 metadataBytesStored := 0 firstBatchProcessed := false + if len(keys) == 0 && len(metadataFiles) > 0 { + logtrace.Info(ctx, "store: batch send (metadata-only)", logtrace.Fields{ + "taskID": taskID, + "metadata_count": len(metadataFiles), + "metadata_bytes": metadataBytes, + "metadata_mb_est": utils.BytesIntToMB(metadataBytes), + }) + bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout) + err = p.p2p.StoreBatch(bctx, metadataFiles, P2PDataRaptorQSymbol, taskID) + cancel() + if err != nil { + return totalSymbols, totalAvailable, fmt.Errorf("p2p store batch (metadata-only): %w", err) + } + logtrace.Info(ctx, "store: batch ok (metadata-only)", logtrace.Fields{ + "taskID": taskID, + "metadata_count": len(metadataFiles), + "metadata_bytes": metadataBytes, + }) + totalBytesStored += metadataBytes + metadataBytesStored += metadataBytes + firstBatchProcessed = true + } for start := 0; start < len(keys); { end := min(start+loadSymbolsBatchSize, len(keys)) batch := keys[start:end] diff --git a/supernode/adaptors/p2p_test.go b/supernode/adaptors/p2p_test.go index 66a94d81..baa51a37 100644 --- a/supernode/adaptors/p2p_test.go +++ b/supernode/adaptors/p2p_test.go @@ -7,7 +7,10 @@ import ( "testing" "github.com/LumeraProtocol/supernode/v2/p2p" + p2pmock "github.com/LumeraProtocol/supernode/v2/p2p/mocks" + "github.com/LumeraProtocol/supernode/v2/pkg/codec" "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" + "github.com/stretchr/testify/mock" "go.uber.org/mock/gomock" ) @@ -18,6 +21,14 @@ type clientWithPeersCount struct { func (c clientWithPeersCount) PeersCount() int { return c.peers } +type p2pClientWithStreamMock struct { + *p2pmock.Client +} + +func (c p2pClientWithStreamMock) BatchRetrieveStream(_ context.Context, _ []string, _ int32, _ string, _ func(string, []byte) error, _ ...bool) (int32, error) { + return 0, nil +} + func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) { svc := NewP2PService(clientWithPeersCount{peers: 0}, nil) @@ -52,3 +63,50 @@ func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) { } } +func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchWhenNoSymbols(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + store := rqstore.NewMockStore(ctrl) + store.EXPECT().StoreSymbolDirectory("task", "").Return(nil) + store.EXPECT().UpdateIsFirstBatchStored("task").Return(nil) + + metadata := [][]byte{[]byte("index-bytes"), []byte("layout-bytes")} + baseClient := p2pmock.NewClient(t) + baseClient.On("StoreBatch", mock.Anything, metadata, P2PDataRaptorQSymbol, "task").Return(nil).Once() + + svc := &p2pImpl{p2p: p2pClientWithStreamMock{Client: baseClient}, rqStore: store} + stored, total, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if stored != 0 { + t.Fatalf("expected 0 stored symbols, got %d", stored) + } + if total != 0 { + t.Fatalf("expected 0 total symbols, got %d", total) + } +} + +func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchFailureSkipsFirstBatchFlag(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + store := rqstore.NewMockStore(ctrl) + store.EXPECT().StoreSymbolDirectory("task", "").Return(nil) + store.EXPECT().UpdateIsFirstBatchStored("task").Times(0) + + metadata := [][]byte{[]byte("index-bytes")} + baseClient := p2pmock.NewClient(t) + baseClient.On("StoreBatch", mock.Anything, metadata, P2PDataRaptorQSymbol, "task").Return(errors.New("p2p down")).Once() + + svc := &p2pImpl{p2p: p2pClientWithStreamMock{Client: baseClient}, rqStore: store} + _, _, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}}) + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), "metadata-only") { + t.Fatalf("expected metadata-only path error, got: %v", err) + } +} +