Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Client interface {
// BatchRetrieve retrieve data from the kademlia network by keys
// reqCount is the minimum number of keys that are actually required by the caller
// to successfully perform the reuquired operation
BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error)
BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, symbolWriter func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error)
// Store store data to the network, which will trigger the iterative store message
// - the base58 encoded identifier will be returned
Store(ctx context.Context, data []byte, typ int) (string, error)
Expand Down
199 changes: 129 additions & 70 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/LumeraProtocol/supernode/v2/pkg/lumera"
ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials"
"github.com/LumeraProtocol/supernode/v2/pkg/storage"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/memory"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/LumeraProtocol/supernode/v2/pkg/storage"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/memory"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/LumeraProtocol/supernode/v2/pkg/p2pmetrics"
)

const (
Expand Down Expand Up @@ -638,7 +639,7 @@ func (s *DHT) doMultiWorkers(ctx context.Context, iterativeType int, target []by
return responses
}

func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result *sync.Map, req int32) (count int32, err error) {
func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result *sync.Map, req int32, writer func(symbolID string, data []byte) error) (count int32, err error) {
batchSize := 5000

// Process in batches
Expand Down Expand Up @@ -670,8 +671,19 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result
// Populate the result map with the local values and count the found keys
for i, val := range localValues {
if len(val) > 0 {
count++
result.Store(batchHexKeys[i], val)
// When writer is provided, call it and store empty marker
// Otherwise store full data in memory
if writer != nil {
if err := writer(batchHexKeys[i], val); err != nil {
logtrace.Error(ctx, "writer error for local key", logtrace.Fields{"key": batchHexKeys[i], logtrace.FieldError: err.Error()})
continue // Skip counting failed writes
}
result.Store(batchHexKeys[i], []byte{}) // Empty marker
count++ // Only count successful writes
} else {
result.Store(batchHexKeys[i], val) // Full data
count++ // Count found data
}
if count >= req {
return count, nil
}
Expand All @@ -682,8 +694,8 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result
return count, err
}

func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) {
logtrace.Debug(ctx, "DHT BatchRetrieve begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required})
func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (result map[string][]byte, err error) {
logtrace.Debug(ctx, "DHT BatchRetrieve begin", logtrace.Fields{"txid": txID, "keys": len(keys), "required": required})
result = make(map[string][]byte)
var resMap sync.Map
var foundLocalCount int32
Expand All @@ -692,6 +704,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
hashes := make([][]byte, len(keys))

defer func() {
// Skip building result map when writer is provided
// Writer stores data to disk; resMap only has empty markers for deduplication
if writer != nil {
return
}

resMap.Range(func(key, value interface{}) bool {
hexKey := key.(string)
valBytes := value.([]byte)
Expand Down Expand Up @@ -723,11 +741,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
result[key] = nil
}

foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required)
if err != nil {
return nil, fmt.Errorf("fetch and add local keys: %v", err)
}
// Found locally count is logged via summary below; no external metrics
foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required, writer)
if err != nil {
return nil, fmt.Errorf("fetch and add local keys: %v", err)
}
// Report locally found count for metrics
p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount))

if foundLocalCount >= required {
logtrace.Debug(ctx, "DHT BatchRetrieve satisfied from local storage", logtrace.Fields{
Expand Down Expand Up @@ -805,34 +824,35 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
break
}

wg.Add(1)
go func(start, end int) {
defer wg.Done()

if err := sem.Acquire(gctx, 1); err != nil {
return
}
defer sem.Release(1)

if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) {
return
}

s.processBatch(
gctx,
keys[start:end],
hexKeys[start:end],
globalClosestContacts,
&closestMu,
knownNodes, &knownMu,
&resMap,
required,
foundLocalCount,
&networkFound,
cancel,
txID,
)
}(start, end)
wg.Add(1)
go func(start, end int) {
defer wg.Done()

if err := sem.Acquire(gctx, 1); err != nil {
return
}
defer sem.Release(1)

if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) {
return
}

s.processBatch(
gctx,
keys[start:end],
hexKeys[start:end],
globalClosestContacts,
&closestMu,
knownNodes, &knownMu,
&resMap,
required,
foundLocalCount,
&networkFound,
cancel,
txID,
writer,
)
}(start, end)
}

wg.Wait()
Expand Down Expand Up @@ -866,6 +886,7 @@ func (s *DHT) processBatch(
networkFound *int32,
cancel context.CancelFunc,
txID string,
writer func(symbolID string, data []byte) error,
) {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -893,14 +914,14 @@ func (s *DHT) processBatch(
}
}

foundCount, batchErr := s.iterateBatchGetValues(
ctx, knownNodes, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound),
)
if batchErr != nil {
logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{
logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(),
})
}
foundCount, batchErr := s.iterateBatchGetValues(
ctx, knownNodes, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), writer,
)
if batchErr != nil {
logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{
logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(),
})
}

atomic.AddInt32(networkFound, int32(foundCount))
if atomic.LoadInt32(networkFound)+int32(foundLocalCount) >= int32(required) {
Expand All @@ -909,21 +930,20 @@ func (s *DHT) processBatch(
}

func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, hexKeys []string, fetchMap map[string][]int,
resMap *sync.Map, req, alreadyFound int32) (int, error) {
sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency))
resMap *sync.Map, req, alreadyFound int32, writer func(symbolID string, data []byte) error) (int, error) {
sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency))
var wg sync.WaitGroup
var firstErr error
var mu sync.Mutex // To protect the firstErr
foundCount := int32(0)

gctx, cancel := context.WithCancel(ctx) // Create a cancellable context
defer cancel()

for nodeID := range fetchMap {
node, ok := nodes[nodeID]
if !ok {
continue
}
gctx, cancel := context.WithCancel(ctx) // Create a cancellable context
defer cancel()
for nodeID := range fetchMap {
node, ok := nodes[nodeID]
if !ok {
continue
}

if s.ignorelist.Banned(node) {
logtrace.Debug(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{
Expand Down Expand Up @@ -963,34 +983,73 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
return
}

decompressedData, err := s.doBatchGetValuesCall(gctx, node, requestKeys)
callStart := time.Now()
decompressedData, err := s.doBatchGetValuesCall(gctx, node, requestKeys)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
// per-node metrics removed; logs retained
// record failed RPC for metrics
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: 0,
Success: false,
Error: err.Error(),
DurationMS: time.Since(callStart).Milliseconds(),
})
return
}

returned := 0
for k, v := range decompressedData {
if len(v.Value) > 0 {
_, loaded := resMap.LoadOrStore(k, v.Value)
// When writer is provided, only store empty marker to save memory
// The writer already persists data to disk for RaptorQ
storeVal := v.Value
if writer != nil {
storeVal = []byte{} // Empty marker for deduplication only
}
_, loaded := resMap.LoadOrStore(k, storeVal)
if !loaded {
atomic.AddInt32(&foundCount, 1)
returned++
if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) {
cancel() // Cancel context to stop other goroutines
// don't early return; record metric and exit goroutine
break
writeSuccess := true
if writer != nil {
// decode k (hex) back to base58 key if your writer expects that
// or just pass the hex; you control the writer side.
if err := writer(k, v.Value); err != nil {
// Log error and mark write as failed
logtrace.Error(ctx, "writer error", logtrace.Fields{"key": k, logtrace.FieldError: err.Error()})
writeSuccess = false
// Remove from resMap since write failed
resMap.Delete(k)
}
}

// Only count if write succeeded (or no writer provided)
if writeSuccess {
atomic.AddInt32(&foundCount, 1)
returned++
if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) {
cancel() // Cancel context to stop other goroutines
// don't early return; record metric and exit goroutine
break
}
}
}
}
}

// per-node metrics removed; logs retained
// record successful RPC per-node (returned may be 0). Success is true when no error.
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
IP: node.IP,
Address: node.String(),
Keys: returned,
Success: true,
Error: "",
DurationMS: time.Since(callStart).Milliseconds(),
})
}(node, nodeID)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
}

// BatchRetrieve retrive the data from the kademlia network
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) {
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) {

if !s.running {
return nil, errors.New("p2p service is not running")
}

return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...)
return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, writer, localOnly...)
}

// Delete delete key in queries node
Expand Down
14 changes: 9 additions & 5 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ type CreateMetadataResponse struct {

// RaptorQ contains methods for request services from RaptorQ service.
type Codec interface {
// Encode a file
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
// without generating RaptorQ symbols.
CreateMetadata(ctx context.Context, req CreateMetadataRequest) (CreateMetadataResponse, error)
// Encode a file
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
// without generating RaptorQ symbols.
CreateMetadata(ctx context.Context, req CreateMetadataRequest) (CreateMetadataResponse, error)
// Streaming decode helpers for writing symbols directly to disk prior to decode
PrepareDecode(ctx context.Context, actionID string, layout Layout) (blockPaths []string,
Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *Workspace, err error)
DecodeFromPrepared(ctx context.Context, ws *Workspace, layout Layout) (DecodeResponse, error)
}
30 changes: 23 additions & 7 deletions supernode/adaptors/rq.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (

// CodecService wraps codec operations used by cascade
type CodecService interface {
EncodeInput(ctx context.Context, actionID string, filePath string) (EncodeResult, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error)
EncodeInput(ctx context.Context, actionID string, filePath string) (EncodeResult, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error)
PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string,
Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error)
DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResult, error)
}

type EncodeResult struct {
Expand Down Expand Up @@ -46,9 +49,22 @@ func (c *codecImpl) EncodeInput(ctx context.Context, actionID, filePath string)
}

func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeResult, error) {
res, err := c.codec.Decode(ctx, codec.DecodeRequest{ActionID: req.ActionID, Symbols: req.Symbols, Layout: req.Layout})
if err != nil {
return DecodeResult{}, err
}
return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil
res, err := c.codec.Decode(ctx, codec.DecodeRequest{ActionID: req.ActionID, Symbols: req.Symbols, Layout: req.Layout})
if err != nil {
return DecodeResult{}, err
}
return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil
}

func (c *codecImpl) PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string,
Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) {
return c.codec.PrepareDecode(ctx, actionID, layout)
}

func (c *codecImpl) DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResult, error) {
res, err := c.codec.DecodeFromPrepared(ctx, ws, layout)
if err != nil {
return DecodeResult{}, err
}
return DecodeResult{FilePath: res.FilePath, DecodeTmpDir: res.DecodeTmpDir}, nil
}
Loading
Loading