From d578a9a676e591469b6bd95d33eec25a605fbdff Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Fri, 3 Oct 2025 18:08:02 +0500 Subject: [PATCH] fx --- cmd/sncli/go.mod | 1 + cmd/sncli/go.sum | 6 +++-- p2p/client.go | 2 +- p2p/kademlia/dht.go | 24 +++++++++++++++---- p2p/mocks/Client.go | 2 +- p2p/p2p.go | 4 ++-- pkg/codec/codec.go | 3 +++ .../cascade/adaptors/mocks/rq_mock.go | 15 ++++++++++++ supernode/services/cascade/adaptors/rq.go | 5 ++++ supernode/services/cascade/download.go | 22 +++++++++++------ tests/integration/p2p/p2p_integration_test.go | 6 +++-- 11 files changed, 71 insertions(+), 19 deletions(-) diff --git a/cmd/sncli/go.mod b/cmd/sncli/go.mod index ef7bb7e0..e6f5c022 100644 --- a/cmd/sncli/go.mod +++ b/cmd/sncli/go.mod @@ -66,6 +66,7 @@ require ( github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect github.com/dgraph-io/badger/v4 v4.2.0 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/emicklei/dot v1.6.2 // indirect diff --git a/cmd/sncli/go.sum b/cmd/sncli/go.sum index 95b01d4a..c80f3a25 100644 --- a/cmd/sncli/go.sum +++ b/cmd/sncli/go.sum @@ -222,10 +222,12 @@ github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8Bzu github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/p2p/client.go b/p2p/client.go index 5d4a44be..b9834eec 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -17,7 +17,7 @@ type Client interface { // BatchRetrieve retrieve data from the kademlia network by keys // reqCount is the minimum number of keys that are actually required by the caller // to successfully perform the reuquired operation - BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) + BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, symbolWriter func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) // Store store data to the network, which will trigger the iterative store message // - the base58 encoded identifier will be returned Store(ctx context.Context, data []byte, typ int) (string, error) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 69c45023..260847d7 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -665,7 +665,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result return count, err } -func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) { +func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (result map[string][]byte, err error) { start := time.Now() result = make(map[string][]byte) var resMap sync.Map @@ -779,6 +779,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, &networkFound, cancel, txID, + writer, ) } @@ -809,6 +810,7 @@ func (s *DHT) processBatch( networkFound *int32, cancel context.CancelFunc, txID string, + writer func(symbolID string, data []byte) error, ) { defer wg.Done() defer func() { <-semaphore }() @@ -836,7 +838,7 @@ func (s *DHT) processBatch( } foundCount, newClosestContacts, batchErr := s.iterateBatchGetValues( - ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), + ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), writer, ) if batchErr != nil { logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ @@ -886,8 +888,8 @@ func (s *DHT) processBatch( } func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, keys []string, hexKeys []string, fetchMap map[string][]int, - resMap *sync.Map, req, alreadyFound int32) (int, map[string]*NodeList, error) { - semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 1 + resMap *sync.Map, req, alreadyFound int32, writer func(symbolID string, data []byte) error) (int, map[string]*NodeList, error) { + semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 3 closestContacts := make(map[string]*NodeList) var wg sync.WaitGroup contactsMap := make(map[string]map[string][]*Node) @@ -898,6 +900,10 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, gctx, cancel := context.WithCancel(ctx) // Create a cancellable context defer cancel() for nodeID, node := range nodes { + if _, ok := fetchMap[nodeID]; !ok { + continue + } + if s.ignorelist.Banned(node) { logtrace.Info(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{ logtrace.FieldModule: "dht", @@ -962,6 +968,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, if len(v.Value) > 0 { _, loaded := resMap.LoadOrStore(k, v.Value) if !loaded { + if writer != nil { + // decode k (hex) back to base58 key if your writer expects that + // or just pass the hex; you control the writer side. + if err := writer(k, v.Value); err != nil { + // you can choose to log and continue, or treat as failure + // here we'll log and continue to avoid losing the rest + logtrace.Error(ctx, "writer error", logtrace.Fields{"key": k, logtrace.FieldError: err.Error()}) + } + } + atomic.AddInt32(&foundCount, 1) returned++ if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) { diff --git a/p2p/mocks/Client.go b/p2p/mocks/Client.go index 67991025..273eb00a 100644 --- a/p2p/mocks/Client.go +++ b/p2p/mocks/Client.go @@ -16,7 +16,7 @@ type Client struct { } // BatchRetrieve provides a mock function with given fields: ctx, keys, reqCount, txID, localOnly -func (_m *Client) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) { +func (_m *Client) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(string, []byte) error, localOnly ...bool) (map[string][]byte, error) { _va := make([]interface{}, len(localOnly)) for _i := range localOnly { _va[_i] = localOnly[_i] diff --git a/p2p/p2p.go b/p2p/p2p.go index e3d6b40a..18888940 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -137,13 +137,13 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by } // BatchRetrieve retrive the data from the kademlia network -func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) { +func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) { if !s.running { return nil, errors.New("p2p service is not running") } - return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...) + return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, writer, localOnly...) } // Delete delete key in queries node diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 39029569..7f42e6ab 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -38,4 +38,7 @@ type Codec interface { // Encode a file Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) + PrepareDecode(ctx context.Context, actionID string, layout Layout) (blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *Workspace, err error) + DecodeFromPrepared(ctx context.Context, ws *Workspace, layout Layout) (DecodeResponse, error) } diff --git a/supernode/services/cascade/adaptors/mocks/rq_mock.go b/supernode/services/cascade/adaptors/mocks/rq_mock.go index f45f2eb5..8b64f904 100644 --- a/supernode/services/cascade/adaptors/mocks/rq_mock.go +++ b/supernode/services/cascade/adaptors/mocks/rq_mock.go @@ -51,6 +51,21 @@ func (mr *MockCodecServiceMockRecorder) Decode(ctx, req interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockCodecService)(nil).Decode), ctx, req) } +// DecodeFromPrepared mocks base method. +func (m *MockCodecService) DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (adaptors.DecodeResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DecodeFromPrepared", ctx, ws, layout) + ret0, _ := ret[0].(adaptors.DecodeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DecodeFromPrepared indicates an expected call of DecodeFromPrepared. +func (mr *MockCodecServiceMockRecorder) DecodeFromPrepared(ctx, ws, layout interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DecodeFromPrepared", reflect.TypeOf((*MockCodecService)(nil).DecodeFromPrepared), ctx, ws, layout) +} + // EncodeInput mocks base method. func (m *MockCodecService) EncodeInput(ctx context.Context, taskID, path string, dataSize int) (adaptors.EncodeResult, error) { m.ctrl.T.Helper() diff --git a/supernode/services/cascade/adaptors/rq.go b/supernode/services/cascade/adaptors/rq.go index 92e89819..9fab3cdb 100644 --- a/supernode/services/cascade/adaptors/rq.go +++ b/supernode/services/cascade/adaptors/rq.go @@ -13,6 +13,7 @@ type CodecService interface { EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error) PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) + DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResponse, error) } // EncodeResult represents the outcome of encoding the input data. @@ -79,3 +80,7 @@ func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeRespon func (c *codecImpl) PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) { return } + +func (c *codecImpl) DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResponse, error) { + return DecodeResponse{}, nil +} diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index 363834bc..e56c090a 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -16,7 +16,6 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" cm "github.com/LumeraProtocol/supernode/v2/pkg/p2pmetrics" "github.com/LumeraProtocol/supernode/v2/pkg/utils" - "github.com/LumeraProtocol/supernode/v2/supernode/services/cascade/adaptors" "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) @@ -179,7 +178,20 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( if reqCount > totalSymbols { reqCount = totalSymbols } - symbols, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, reqCount, actionID) + + _, writeSymbol, _, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout) + if perr != nil { + return "", "", fmt.Errorf("prepare decode: %w", perr) + } + + // bridge to DHT: it expects SymbolWriter(string, []byte) + writer := func(symbolID string, data []byte) error { + // single-block: always write to block 0 + _, err := writeSymbol(0, symbolID, data) + return err + } + + _, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, reqCount, actionID, writer) if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "batch retrieve failed", fields) @@ -189,11 +201,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( // Measure decode duration decodeStart := time.Now() - decodeInfo, err := task.RQ.Decode(ctx, adaptors.DecodeRequest{ - ActionID: actionID, - Symbols: symbols, - Layout: layout, - }) + decodeInfo, err := task.RQ.DecodeFromPrepared(ctx, ws, layout) if err != nil { fields[logtrace.FieldError] = err.Error() logtrace.Error(ctx, "decode failed", fields) diff --git a/tests/integration/p2p/p2p_integration_test.go b/tests/integration/p2p/p2p_integration_test.go index bce71f58..848d62b2 100644 --- a/tests/integration/p2p/p2p_integration_test.go +++ b/tests/integration/p2p/p2p_integration_test.go @@ -108,7 +108,7 @@ func TestP2PBasicIntegration(t *testing.T) { // Add debug logging log.Printf("Storing batch with keys: %v", expectedKeys) - err := services[0].StoreBatch(ctx, batchData, 0, taskID) + err := services[0].StoreBatch(ctx, batchData, 0, taskID) require.NoError(t, err) // Add immediate verification @@ -122,7 +122,9 @@ func TestP2PBasicIntegration(t *testing.T) { } // Now try batch retrieve - retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID) + retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID, func(symbolID string, data []byte) error { + return nil + }) require.NoError(t, err) require.Equal(t, batchSize, len(retrieved), "Expected %d items, got %d", batchSize, len(retrieved))