Skip to content
Closed

fx #204

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/sncli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/emicklei/dot v1.6.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions cmd/sncli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,12 @@ github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8Bzu
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM=
github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down
2 changes: 1 addition & 1 deletion p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Client interface {
// BatchRetrieve retrieve data from the kademlia network by keys
// reqCount is the minimum number of keys that are actually required by the caller
// to successfully perform the reuquired operation
BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error)
BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, symbolWriter func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error)
// Store store data to the network, which will trigger the iterative store message
// - the base58 encoded identifier will be returned
Store(ctx context.Context, data []byte, typ int) (string, error)
Expand Down
24 changes: 20 additions & 4 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result
return count, err
}

func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) {
func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (result map[string][]byte, err error) {
start := time.Now()
result = make(map[string][]byte)
var resMap sync.Map
Expand Down Expand Up @@ -779,6 +779,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
&networkFound,
cancel,
txID,
writer,
)
}

Expand Down Expand Up @@ -809,6 +810,7 @@ func (s *DHT) processBatch(
networkFound *int32,
cancel context.CancelFunc,
txID string,
writer func(symbolID string, data []byte) error,
) {
defer wg.Done()
defer func() { <-semaphore }()
Expand Down Expand Up @@ -836,7 +838,7 @@ func (s *DHT) processBatch(
}

foundCount, newClosestContacts, batchErr := s.iterateBatchGetValues(
ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound),
ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), writer,
)
if batchErr != nil {
logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{
Expand Down Expand Up @@ -886,8 +888,8 @@ func (s *DHT) processBatch(
}

func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, keys []string, hexKeys []string, fetchMap map[string][]int,
resMap *sync.Map, req, alreadyFound int32) (int, map[string]*NodeList, error) {
semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 1
resMap *sync.Map, req, alreadyFound int32, writer func(symbolID string, data []byte) error) (int, map[string]*NodeList, error) {
semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 3
closestContacts := make(map[string]*NodeList)
var wg sync.WaitGroup
contactsMap := make(map[string]map[string][]*Node)
Expand All @@ -898,6 +900,10 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
gctx, cancel := context.WithCancel(ctx) // Create a cancellable context
defer cancel()
for nodeID, node := range nodes {
if _, ok := fetchMap[nodeID]; !ok {
continue
}

if s.ignorelist.Banned(node) {
logtrace.Info(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{
logtrace.FieldModule: "dht",
Expand Down Expand Up @@ -962,6 +968,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
if len(v.Value) > 0 {
_, loaded := resMap.LoadOrStore(k, v.Value)
if !loaded {
if writer != nil {
// decode k (hex) back to base58 key if your writer expects that
// or just pass the hex; you control the writer side.
if err := writer(k, v.Value); err != nil {
// you can choose to log and continue, or treat as failure
// here we'll log and continue to avoid losing the rest
logtrace.Error(ctx, "writer error", logtrace.Fields{"key": k, logtrace.FieldError: err.Error()})
}
}

atomic.AddInt32(&foundCount, 1)
returned++
if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func (s *p2p) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
}

// BatchRetrieve retrive the data from the kademlia network
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, localOnly ...bool) (map[string][]byte, error) {
func (s *p2p) BatchRetrieve(ctx context.Context, keys []string, reqCount int, txID string, writer func(symbolID string, data []byte) error, localOnly ...bool) (map[string][]byte, error) {

if !s.running {
return nil, errors.New("p2p service is not running")
}

return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, localOnly...)
return s.dht.BatchRetrieve(ctx, keys, int32(reqCount), txID, writer, localOnly...)
}

// Delete delete key in queries node
Expand Down
3 changes: 3 additions & 0 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ type Codec interface {
// Encode a file
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
PrepareDecode(ctx context.Context, actionID string, layout Layout) (blockPaths []string,
Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *Workspace, err error)
DecodeFromPrepared(ctx context.Context, ws *Workspace, layout Layout) (DecodeResponse, error)
}
15 changes: 15 additions & 0 deletions supernode/services/cascade/adaptors/mocks/rq_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions supernode/services/cascade/adaptors/rq.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type CodecService interface {
EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error)
PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error)
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResponse, error)
}

// EncodeResult represents the outcome of encoding the input data.
Expand Down Expand Up @@ -79,3 +80,7 @@ func (c *codecImpl) Decode(ctx context.Context, req DecodeRequest) (DecodeRespon
func (c *codecImpl) PrepareDecode(ctx context.Context, actionID string, layout codec.Layout) (blockPaths []string, Write func(block int, symbolID string, data []byte) (string, error), Cleanup func() error, ws *codec.Workspace, err error) {
return
}

func (c *codecImpl) DecodeFromPrepared(ctx context.Context, ws *codec.Workspace, layout codec.Layout) (DecodeResponse, error) {
return DecodeResponse{}, nil
}
22 changes: 15 additions & 7 deletions supernode/services/cascade/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
cm "github.com/LumeraProtocol/supernode/v2/pkg/p2pmetrics"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/LumeraProtocol/supernode/v2/supernode/services/cascade/adaptors"
"github.com/LumeraProtocol/supernode/v2/supernode/services/common"
)

Expand Down Expand Up @@ -179,7 +178,20 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
if reqCount > totalSymbols {
reqCount = totalSymbols
}
symbols, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, reqCount, actionID)

_, writeSymbol, _, ws, perr := task.RQ.PrepareDecode(ctx, actionID, layout)
if perr != nil {
return "", "", fmt.Errorf("prepare decode: %w", perr)
}

// bridge to DHT: it expects SymbolWriter(string, []byte)
writer := func(symbolID string, data []byte) error {
// single-block: always write to block 0
_, err := writeSymbol(0, symbolID, data)
return err
}

_, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, reqCount, actionID, writer)
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "batch retrieve failed", fields)
Expand All @@ -189,11 +201,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(

// Measure decode duration
decodeStart := time.Now()
decodeInfo, err := task.RQ.Decode(ctx, adaptors.DecodeRequest{
ActionID: actionID,
Symbols: symbols,
Layout: layout,
})
decodeInfo, err := task.RQ.DecodeFromPrepared(ctx, ws, layout)
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "decode failed", fields)
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/p2p/p2p_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestP2PBasicIntegration(t *testing.T) {

// Add debug logging
log.Printf("Storing batch with keys: %v", expectedKeys)
err := services[0].StoreBatch(ctx, batchData, 0, taskID)
err := services[0].StoreBatch(ctx, batchData, 0, taskID)
require.NoError(t, err)

// Add immediate verification
Expand All @@ -122,7 +122,9 @@ func TestP2PBasicIntegration(t *testing.T) {
}

// Now try batch retrieve
retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID)
retrieved, err := services[0].BatchRetrieve(ctx, expectedKeys, batchSize, taskID, func(symbolID string, data []byte) error {
return nil
})
require.NoError(t, err)
require.Equal(t, batchSize, len(retrieved), "Expected %d items, got %d", batchSize, len(retrieved))

Expand Down
Loading