Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ const (
defaultDeleteDataInterval = 11 * time.Hour
delKeysCountThreshold = 10
lowSpaceThreshold = 50 // GB
batchStoreSize = 2500
batchRetrieveSize = 1000
storeSameSymbolsBatchConcurrency = 3
storeSymbolsBatchConcurrency = 3.0
fetchSymbolsBatchConcurrency = 6
minimumDataStoreSuccessRate = 75.0

maxIterations = 4
Expand Down Expand Up @@ -744,10 +744,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
return result, nil
}

batchSize := batchStoreSize
batchSize := batchRetrieveSize
var networkFound int32
totalBatches := int(math.Ceil(float64(required) / float64(batchSize)))
parallelBatches := int(math.Min(float64(totalBatches), storeSymbolsBatchConcurrency))
parallelBatches := int(math.Min(float64(totalBatches), float64(fetchSymbolsBatchConcurrency)))

semaphore := make(chan struct{}, parallelBatches)
var wg sync.WaitGroup
Expand Down Expand Up @@ -785,7 +785,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
wg.Wait()

netFound := int(atomic.LoadInt32(&networkFound))
s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Duration(time.Since(start).Milliseconds())) // NEW
// Record batch retrieve stats for internal DHT snapshot window
s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(start))
// Also feed retrieve counts into the per-task collector for stream events
p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds())

return result, nil
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/p2pmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func BuildStoreEventPayloadFromCollector(taskID string) map[string]any {
type retrieveSession struct {
CallsByIP map[string][]Call
FoundLocal int
FoundNet int
Keys int
Required int
RetrieveMS int64
DecodeMS int64
}
Expand Down Expand Up @@ -234,6 +237,23 @@ func StopRetrieveCapture(taskID string) {
UnregisterFoundLocalHook(taskID)
}

// SetRetrieveBatchSummary sets counts for a retrieval attempt.
func SetRetrieveBatchSummary(taskID string, keys, required, foundLocal, foundNet int, retrieveMS int64) {
if taskID == "" {
return
}
s := retrieveSessions.m[taskID]
if s == nil {
s = &retrieveSession{CallsByIP: map[string][]Call{}}
retrieveSessions.m[taskID] = s
}
s.Keys = keys
s.Required = required
s.FoundLocal = foundLocal
s.FoundNet = foundNet
s.RetrieveMS = retrieveMS
}

// SetRetrieveSummary sets timing info for retrieve/decode phases.
func SetRetrieveSummary(taskID string, retrieveMS, decodeMS int64) {
if taskID == "" {
Expand All @@ -254,7 +274,10 @@ func BuildDownloadEventPayloadFromCollector(taskID string) map[string]any {
if s == nil {
return map[string]any{
"retrieve": map[string]any{
"keys": 0,
"required": 0,
"found_local": 0,
"found_net": 0,
"retrieve_ms": int64(0),
"decode_ms": int64(0),
"calls_by_ip": map[string][]Call{},
Expand All @@ -263,7 +286,10 @@ func BuildDownloadEventPayloadFromCollector(taskID string) map[string]any {
}
return map[string]any{
"retrieve": map[string]any{
"keys": s.Keys,
"required": s.Required,
"found_local": s.FoundLocal,
"found_net": s.FoundNet,
"retrieve_ms": s.RetrieveMS,
"decode_ms": s.DecodeMS,
"calls_by_ip": s.CallsByIP,
Expand Down
15 changes: 14 additions & 1 deletion supernode/services/cascade/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/LumeraProtocol/supernode/v2/supernode/services/common"
)

const targetRequiredPercent = 17

type DownloadRequest struct {
ActionID string
}
Expand Down Expand Up @@ -156,6 +158,11 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(

totalSymbols := len(allSymbols)
fields["totalSymbols"] = totalSymbols
// Compute target requirement (reporting only; does not change behavior)
targetRequiredCount := (totalSymbols*targetRequiredPercent + 99) / 100
if targetRequiredCount < 1 && totalSymbols > 0 {
targetRequiredCount = 1
}
logtrace.Info(ctx, "Retrieving all symbols for decode", fields)

// Enable retrieve metrics capture for this action
Expand Down Expand Up @@ -190,7 +197,13 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(

// Set minimal retrieve summary and emit event strictly from internal collector
cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS)
if b, err := json.MarshalIndent(cm.BuildDownloadEventPayloadFromCollector(actionID), "", " "); err == nil {
payload := cm.BuildDownloadEventPayloadFromCollector(actionID)
if retrieve, ok := payload["retrieve"].(map[string]any); ok {
retrieve["target_required_percent"] = targetRequiredPercent
retrieve["target_required_count"] = targetRequiredCount
retrieve["total_symbols"] = totalSymbols
}
if b, err := json.MarshalIndent(payload, "", " "); err == nil {
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send)
}

Expand Down