From 4e4664edad53654168304baf9d52e2625e93445f Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Fri, 3 Oct 2025 18:08:02 +0500 Subject: [PATCH 1/4] fx --- p2p/client.go | 2 +- p2p/kademlia/dht.go | 103 ++++++++++-------- p2p/mocks/Client.go | 2 +- p2p/p2p.go | 4 +- pkg/codec/codec.go | 14 ++- supernode/adaptors/rq.go | 30 +++-- supernode/cascade/download.go | 56 ++++++---- tests/integration/p2p/p2p_integration_test.go | 4 +- 8 files changed, 131 insertions(+), 84 deletions(-) diff --git a/p2p/client.go b/p2p/client.go index 5d4a44be..b9834eec 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -17,7 +17,7 @@ type Client interface { // BatchRetrieve retrieve data from the kademlia network by keys // reqCount is the minimum number of keys that are actually required by the caller // to successfully perform the reuquired operation - BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) + BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, symbolWriter func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) // Store store data to the network, which will trigger the iterative store message // - the base58 encoded identifier will be returned Store(ctx context.Context, data []byte, typ int) (string, error) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 13615deb..d84127ea 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -682,8 +682,8 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result return count, err } -func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) { - logtrace.Debug(ctx, "DHT BatchRetrieve begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required}) +func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (result map[string][]byte, err error) { + logtrace.Debug(ctx, "DHT BatchRetrieve begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required}) result = make(map[string][]byte) var resMap sync.Map var foundLocalCount int32 @@ -805,34 +805,35 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, break } - wg.Add(1) - go func(start, end int) { - defer wg.Done() - - if err := sem.Acquire(gctx, 1); err != nil { - return - } - defer sem.Release(1) - - if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) { - return - } - - s.processBatch( - gctx, - keys[start:end], - hexKeys[start:end], - globalClosestContacts, - &closestMu, - knownNodes, &knownMu, - &resMap, - required, - foundLocalCount, - &networkFound, - cancel, - txID, - ) - }(start, end) + wg.Add(1) + go func(start, end int) { + defer wg.Done() + + if err := sem.Acquire(gctx, 1); err != nil { + return + } + defer sem.Release(1) + + if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) { + return + } + + s.processBatch( + gctx, + keys[start:end], + hexKeys[start:end], + globalClosestContacts, + &closestMu, + knownNodes, &knownMu, + &resMap, + required, + foundLocalCount, + &networkFound, + cancel, + txID, + writer, + ) + }(start, end) } wg.Wait() @@ -866,6 +867,7 @@ func (s *DHT) processBatch( networkFound *int32, cancel context.CancelFunc, txID string, + writer func(symbolID string, data []byte) error, ) { select { case <-ctx.Done(): @@ -893,14 +895,14 @@ func (s *DHT) processBatch( } } - foundCount, batchErr := s.iterateBatchGetValues( - ctx, knownNodes, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), - ) - if batchErr != nil { - logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ - logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(), - }) - } + foundCount, batchErr := s.iterateBatchGetValues( + ctx, knownNodes, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), writer, + ) + if batchErr != nil { + logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ + logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(), + }) + } atomic.AddInt32(networkFound, int32(foundCount)) if atomic.LoadInt32(networkFound)+int32(foundLocalCount) >= int32(required) { @@ -909,8 +911,8 @@ func (s *DHT) processBatch( } func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, hexKeys []string, fetchMap map[string][]int, - resMap *sync.Map, req, alreadyFound int32) (int, error) { - sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency)) + resMap *sync.Map, req, alreadyFound int32, writer func(symbolID string, data []byte) error) (int, error) { + sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency)) var wg sync.WaitGroup var firstErr error var mu sync.Mutex // To protect the firstErr @@ -918,12 +920,11 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, gctx, cancel := context.WithCancel(ctx) // Create a cancellable context defer cancel() - - for nodeID := range fetchMap { - node, ok := nodes[nodeID] - if !ok { - continue - } + for nodeID := range fetchMap { + node, ok := nodes[nodeID] + if !ok { + continue + } if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{ @@ -979,6 +980,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, if len(v.Value) > 0 { _, loaded := resMap.LoadOrStore(k, v.Value) if !loaded { + if writer != nil { + // decode k (hex) back to base58 key if your writer expects that + // or just pass the hex; you control the writer side. + if err := writer(k, v.Value); err != nil { + // you can choose to log and continue, or treat as failure + // here we'll log and continue to avoid losing the rest + logtrace.Error(ctx, "writer error", logtrace.Fields{"key": k, logtrace.FieldError: err.Error()}) + } + } + atomic.AddInt32(&foundCount, 1) returned++ if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) { diff --git a/p2p/mocks/Client.go b/p2p/mocks/Client.go index 67991025..273eb00a 100644 --- a/p2p/mocks/Client.go +++ b/p2p/mocks/Client.go @@ -16,7 +16,7 @@ type Client struct { } // BatchRetrieve provides a mock function with given fields: ctx, keys, reqCount, txID, localOnly -func (_m *Client) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) { +func (_m *Client) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(string, []byte) error, localOnly ...bool) (map[string][]byte, error) { _va := make([]interface{}, len(localOnly)) for _i := range localOnly { _va[_i] = localOnly[_i] diff --git a/p2p/p2p.go b/p2p/p2p.go index f9a5f74e..3d84042d 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -137,13 +137,13 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by } // BatchRetrieve retrive the data from the kademlia network -func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) { +func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) { if !s.running { return nil, errors.New("p2p service is not running") } - return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...) + return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, writer, localOnly...) } // Delete delete key in queries node diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index 73c31a2a..433994e8 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -42,9 +42,13 @@ type CreateMetadataResponse struct { // RaptorQ contains methods for request services from RaptorQ service. type Codec interface { - // Encode a file - Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) - Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) - // without generating RaptorQ symbols. - CreateMetadata(ctx context.Context, req CreateMetadataRequest) (CreateMetadataResponse, error) + // Encode a file + Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) + Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) + // without generating RaptorQ symbols. + CreateMetadata(ctx context.Context, req CreateMetadataRequest) (CreateMetadataResponse, error) + // Streaming decode helpers for writing symbols directly to disk prior to decode + PrepareDecode(ctx context.Context, actionID string, layout Layout) (blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *Workspace, err error) + DecodeFromPrepared(ctx context.Context, ws *Workspace, layout Layout) (DecodeResponse, error) } diff --git a/supernode/adaptors/rq.go b/supernode/adaptors/rq.go index b8efa1dd..f414be57 100644 --- a/supernode/adaptors/rq.go +++ b/supernode/adaptors/rq.go @@ -9,8 +9,11 @@ import ( // CodecService wraps codec operations used by cascade type CodecService interface { - EncodeInput(ctx context.Context, actionID string, filePath string) (EncodeResult, error) - Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error) + EncodeInput(ctx context.Context, actionID string, filePath string) (EncodeResult, error) + Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error) + PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) + DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResult, error) } type EncodeResult struct { @@ -46,9 +49,22 @@ func (c *codecImpl) EncodeInput(ctx context.Context, actionID, filePath string) } func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error) { - res, err := c.codec.Decode(ctx, codec.DecodeRequest{ActionID: req.ActionID, Symbols: req.Symbols, Layout: req.Layout}) - if err != nil { - return DecodeResult{}, err - } - return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil + res, err := c.codec.Decode(ctx, codec.DecodeRequest{ActionID: req.ActionID, Symbols: req.Symbols, Layout: req.Layout}) + if err != nil { + return DecodeResult{}, err + } + return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil +} + +func (c *codecImpl) PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, + Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) { + return c.codec.PrepareDecode(ctx, actionID, layout) +} + +func (c *codecImpl) DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResult, error) { + res, err := c.codec.DecodeFromPrepared(ctx, ws, layout) + if err != nil { + return DecodeResult{}, err + } + return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil } diff --git a/supernode/cascade/download.go b/supernode/cascade/download.go index 7378f920..e027c817 100644 --- a/supernode/cascade/download.go +++ b/supernode/cascade/download.go @@ -195,27 +195,41 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(ctx context.Context, if reqCount > totalSymbols { reqCount = totalSymbols } - rStart := time.Now() - logtrace.Info(ctx, "download: batch retrieve start", logtrace.Fields{"action_id": actionID, "requested": reqCount, "total_candidates": totalSymbols}) - symbols, err := task.P2PClient.BatchRetrieve(ctx, allSymbols, reqCount, actionID) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "batch retrieve failed", fields) - return "", "", fmt.Errorf("batch retrieve symbols: %w", err) - } - retrieveMS := time.Since(retrieveStart).Milliseconds() - logtrace.Info(ctx, "download: batch retrieve ok", logtrace.Fields{"action_id": actionID, "received": len(symbols), "ms": time.Since(rStart).Milliseconds()}) - decodeStart := time.Now() - dStart := time.Now() - logtrace.Info(ctx, "download: decode start", logtrace.Fields{"action_id": actionID}) - decodeInfo, err := task.RQ.Decode(ctx, adaptors.DecodeRequest{ActionID: actionID, Symbols: symbols, Layout: layout}) - if err != nil { - fields[logtrace.FieldError] = err.Error() - logtrace.Error(ctx, "decode failed", fields) - return "", "", fmt.Errorf("decode symbols using RaptorQ: %w", err) - } - decodeMS := time.Since(decodeStart).Milliseconds() - logtrace.Info(ctx, "download: decode ok", logtrace.Fields{"action_id": actionID, "ms": time.Since(dStart).Milliseconds(), "tmp_dir": decodeInfo.DecodeTmpDir, "file_path": decodeInfo.FilePath}) + rStart := time.Now() + logtrace.Info(ctx, "download: prepare decode", logtrace.Fields{"action_id": actionID}) + _, writeSymbol, cleanup, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout) + if perr != nil { + fields[logtrace.FieldError] = perr.Error() + logtrace.Error(ctx, "prepare decode failed", fields) + return "", "", fmt.Errorf("prepare decode workspace: %w", perr) + } + writer := func(symbolID string, data []byte) error { + _, werr := writeSymbol(-1, symbolID, data) + return werr + } + logtrace.Info(ctx, "download: batch retrieve start", logtrace.Fields{"action_id": actionID, "requested": reqCount, "total_candidates": totalSymbols}) + // We ignore the returned map since symbols are streamed to disk via writer + resultMap, err := task.P2PClient.BatchRetrieve(ctx, allSymbols, reqCount, actionID, writer) + if err != nil { + fields[logtrace.FieldError] = err.Error() + logtrace.Error(ctx, "batch retrieve failed", fields) + if cleanup != nil { _ = cleanup() } + return "", "", fmt.Errorf("batch retrieve symbols: %w", err) + } + retrieveMS := time.Since(retrieveStart).Milliseconds() + logtrace.Info(ctx, "download: batch retrieve ok", logtrace.Fields{"action_id": actionID, "received": len(resultMap), "ms": time.Since(rStart).Milliseconds()}) + decodeStart := time.Now() + dStart := time.Now() + logtrace.Info(ctx, "download: decode start", logtrace.Fields{"action_id": actionID}) + decodeInfo, derr := task.RQ.DecodeFromPrepared(ctx, ws, layout) + if derr != nil { + fields[logtrace.FieldError] = derr.Error() + logtrace.Error(ctx, "decode failed", fields) + if cleanup != nil { _ = cleanup() } + return "", "", fmt.Errorf("decode symbols using RaptorQ: %w", derr) + } + decodeMS := time.Since(decodeStart).Milliseconds() + logtrace.Info(ctx, "download: decode ok", logtrace.Fields{"action_id": actionID, "ms": time.Since(dStart).Milliseconds(), "tmp_dir": decodeInfo.DecodeTmpDir, "file_path": decodeInfo.FilePath}) // Emit timing metrics for network retrieval and decode phases logtrace.Debug(ctx, "download: timing", logtrace.Fields{"action_id": actionID, "retrieve_ms": retrieveMS, "decode_ms": decodeMS}) diff --git a/tests/integration/p2p/p2p_integration_test.go b/tests/integration/p2p/p2p_integration_test.go index a856211b..a0191b63 100644 --- a/tests/integration/p2p/p2p_integration_test.go +++ b/tests/integration/p2p/p2p_integration_test.go @@ -122,7 +122,9 @@ func TestP2PBasicIntegration(t *testing.T) { } // Now try batch retrieve - retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID) + retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID, func(symbolID string, data []byte) error { + return nil + }) require.NoError(t, err) require.Equal(t, batchSize, len(retrieved), "Expected %d items, got %d", batchSize, len(retrieved)) From c8f66f3c6980fc886ca28838b41bbc8fa8446cfb Mon Sep 17 00:00:00 2001 From: J Bilal rafique Date: Tue, 21 Oct 2025 19:48:15 +0500 Subject: [PATCH 2/4] Optimize Symbol Fetch --- p2p/kademlia/dht.go | 47 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index d84127ea..16062d17 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -24,10 +24,11 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials" - "github.com/LumeraProtocol/supernode/v2/pkg/storage" - "github.com/LumeraProtocol/supernode/v2/pkg/storage/memory" - "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" - "github.com/LumeraProtocol/supernode/v2/pkg/utils" + "github.com/LumeraProtocol/supernode/v2/pkg/storage" + "github.com/LumeraProtocol/supernode/v2/pkg/storage/memory" + "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" + "github.com/LumeraProtocol/supernode/v2/pkg/p2pmetrics" ) const ( @@ -723,11 +724,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, result[key] = nil } - foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) - if err != nil { - return nil, fmt.Errorf("fetch and add local keys: %v", err) - } - // Found locally count is logged via summary below; no external metrics + foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) + if err != nil { + return nil, fmt.Errorf("fetch and add local keys: %v", err) + } + // Report locally found count for metrics + p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount)) if foundLocalCount >= required { logtrace.Debug(ctx, "DHT BatchRetrieve satisfied from local storage", logtrace.Fields{ @@ -918,8 +920,8 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, var mu sync.Mutex // To protect the firstErr foundCount := int32(0) - gctx, cancel := context.WithCancel(ctx) // Create a cancellable context - defer cancel() + gctx, cancel := context.WithCancel(ctx) // Create a cancellable context + defer cancel() for nodeID := range fetchMap { node, ok := nodes[nodeID] if !ok { @@ -964,14 +966,23 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, return } - decompressedData, err := s.doBatchGetValuesCall(gctx, node, requestKeys) + callStart := time.Now() + decompressedData, err := s.doBatchGetValuesCall(gctx, node, requestKeys) if err != nil { mu.Lock() if firstErr == nil { firstErr = err } mu.Unlock() - // per-node metrics removed; logs retained + // record failed RPC for metrics + p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ + IP: node.IP, + Address: node.String(), + Keys: 0, + Success: false, + Error: err.Error(), + DurationMS: time.Since(callStart).Milliseconds(), + }) return } @@ -1001,7 +1012,15 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, } } - // per-node metrics removed; logs retained + // record successful RPC per-node (returned may be 0). Success is true when no error. + p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{ + IP: node.IP, + Address: node.String(), + Keys: returned, + Success: true, + Error: "", + DurationMS: time.Since(callStart).Milliseconds(), + }) }(node, nodeID) } From 98c732d149d0a4c3911e4a9207bcd2aa661beb49 Mon Sep 17 00:00:00 2001 From: J Bilal rafique Date: Wed, 22 Oct 2025 18:03:34 +0500 Subject: [PATCH 3/4] Optimize write --- p2p/kademlia/dht.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 16062d17..ad623d2c 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -639,7 +639,7 @@ func (s *DHT) doMultiWorkers(ctx context.Context, iterativeType int, target []by return responses } -func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result *sync.Map, req int32) (count int32, err error) { +func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result *sync.Map, req int32, writer func(symbolID string, data []byte) error) (count int32, err error) { batchSize := 5000 // Process in batches @@ -672,7 +672,16 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result for i, val := range localValues { if len(val) > 0 { count++ - result.Store(batchHexKeys[i], val) + // When writer is provided, call it and store empty marker + // Otherwise store full data in memory + if writer != nil { + if err := writer(batchHexKeys[i], val); err != nil { + logtrace.Error(ctx, "writer error for local key", logtrace.Fields{"key": batchHexKeys[i], logtrace.FieldError: err.Error()}) + } + result.Store(batchHexKeys[i], []byte{}) // Empty marker + } else { + result.Store(batchHexKeys[i], val) // Full data + } if count >= req { return count, nil } @@ -693,6 +702,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, hashes := make([][]byte, len(keys)) defer func() { + // Skip building result map when writer is provided + // Writer stores data to disk; resMap only has empty markers for deduplication + if writer != nil { + return + } + resMap.Range(func(key, value interface{}) bool { hexKey := key.(string) valBytes := value.([]byte) @@ -724,7 +739,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, result[key] = nil } - foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) + foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required, writer) if err != nil { return nil, fmt.Errorf("fetch and add local keys: %v", err) } @@ -989,7 +1004,13 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, returned := 0 for k, v := range decompressedData { if len(v.Value) > 0 { - _, loaded := resMap.LoadOrStore(k, v.Value) + // When writer is provided, only store empty marker to save memory + // The writer already persists data to disk for RaptorQ + storeVal := v.Value + if writer != nil { + storeVal = []byte{} // Empty marker for deduplication only + } + _, loaded := resMap.LoadOrStore(k, storeVal) if !loaded { if writer != nil { // decode k (hex) back to base58 key if your writer expects that From 62300facb6a1652809f6f8763741ec83c3e51847 Mon Sep 17 00:00:00 2001 From: J Bilal rafique Date: Mon, 27 Oct 2025 14:49:30 +0500 Subject: [PATCH 4/4] Only count success writes --- p2p/kademlia/dht.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index ad623d2c..7ef8ceb4 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -671,16 +671,18 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result // Populate the result map with the local values and count the found keys for i, val := range localValues { if len(val) > 0 { - count++ // When writer is provided, call it and store empty marker // Otherwise store full data in memory if writer != nil { if err := writer(batchHexKeys[i], val); err != nil { logtrace.Error(ctx, "writer error for local key", logtrace.Fields{"key": batchHexKeys[i], logtrace.FieldError: err.Error()}) + continue // Skip counting failed writes } result.Store(batchHexKeys[i], []byte{}) // Empty marker + count++ // Only count successful writes } else { result.Store(batchHexKeys[i], val) // Full data + count++ // Count found data } if count >= req { return count, nil @@ -1012,22 +1014,28 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, } _, loaded := resMap.LoadOrStore(k, storeVal) if !loaded { + writeSuccess := true if writer != nil { // decode k (hex) back to base58 key if your writer expects that // or just pass the hex; you control the writer side. if err := writer(k, v.Value); err != nil { - // you can choose to log and continue, or treat as failure - // here we'll log and continue to avoid losing the rest + // Log error and mark write as failed logtrace.Error(ctx, "writer error", logtrace.Fields{"key": k, logtrace.FieldError: err.Error()}) + writeSuccess = false + // Remove from resMap since write failed + resMap.Delete(k) } } - atomic.AddInt32(&foundCount, 1) - returned++ - if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) { - cancel() // Cancel context to stop other goroutines - // don't early return; record metric and exit goroutine - break + // Only count if write succeeded (or no writer provided) + if writeSuccess { + atomic.AddInt32(&foundCount, 1) + returned++ + if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) { + cancel() // Cancel context to stop other goroutines + // don't early return; record metric and exit goroutine + break + } } } }