From 131407745abd585d20cdbd22eac827005d0f5a5b Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Mon, 22 Sep 2025 18:53:51 +0500 Subject: [PATCH] Add more metrics in p2p --- p2p/kademlia/dht.go | 13 ++++++++----- pkg/p2pmetrics/metrics.go | 26 ++++++++++++++++++++++++++ supernode/services/cascade/download.go | 15 ++++++++++++++- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 0209bd73..00afad75 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -41,9 +41,9 @@ const ( defaultDeleteDataInterval = 11 * time.Hour delKeysCountThreshold = 10 lowSpaceThreshold = 50 // GB - batchStoreSize = 2500 + batchRetrieveSize = 1000 storeSameSymbolsBatchConcurrency = 3 - storeSymbolsBatchConcurrency = 3.0 + fetchSymbolsBatchConcurrency = 6 minimumDataStoreSuccessRate = 75.0 maxIterations = 4 @@ -744,10 +744,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, return result, nil } - batchSize := batchStoreSize + batchSize := batchRetrieveSize var networkFound int32 totalBatches := int(math.Ceil(float64(required) / float64(batchSize))) - parallelBatches := int(math.Min(float64(totalBatches), storeSymbolsBatchConcurrency)) + parallelBatches := int(math.Min(float64(totalBatches), float64(fetchSymbolsBatchConcurrency))) semaphore := make(chan struct{}, parallelBatches) var wg sync.WaitGroup @@ -785,7 +785,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, wg.Wait() netFound := int(atomic.LoadInt32(&networkFound)) - s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Duration(time.Since(start).Milliseconds())) // NEW + // Record batch retrieve stats for internal DHT snapshot window + s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(start)) + // Also feed retrieve counts into the per-task collector for stream events + p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds()) return result, nil } diff --git a/pkg/p2pmetrics/metrics.go b/pkg/p2pmetrics/metrics.go index b483bb1d..945225db 100644 --- a/pkg/p2pmetrics/metrics.go +++ b/pkg/p2pmetrics/metrics.go @@ -199,6 +199,9 @@ func BuildStoreEventPayloadFromCollector(taskID string) map[string]any { type retrieveSession struct { CallsByIP map[string][]Call FoundLocal int + FoundNet int + Keys int + Required int RetrieveMS int64 DecodeMS int64 } @@ -234,6 +237,23 @@ func StopRetrieveCapture(taskID string) { UnregisterFoundLocalHook(taskID) } +// SetRetrieveBatchSummary sets counts for a retrieval attempt. +func SetRetrieveBatchSummary(taskID string, keys, required, foundLocal, foundNet int, retrieveMS int64) { + if taskID == "" { + return + } + s := retrieveSessions.m[taskID] + if s == nil { + s = &retrieveSession{CallsByIP: map[string][]Call{}} + retrieveSessions.m[taskID] = s + } + s.Keys = keys + s.Required = required + s.FoundLocal = foundLocal + s.FoundNet = foundNet + s.RetrieveMS = retrieveMS +} + // SetRetrieveSummary sets timing info for retrieve/decode phases. func SetRetrieveSummary(taskID string, retrieveMS, decodeMS int64) { if taskID == "" { @@ -254,7 +274,10 @@ func BuildDownloadEventPayloadFromCollector(taskID string) map[string]any { if s == nil { return map[string]any{ "retrieve": map[string]any{ + "keys": 0, + "required": 0, "found_local": 0, + "found_net": 0, "retrieve_ms": int64(0), "decode_ms": int64(0), "calls_by_ip": map[string][]Call{}, @@ -263,7 +286,10 @@ func BuildDownloadEventPayloadFromCollector(taskID string) map[string]any { } return map[string]any{ "retrieve": map[string]any{ + "keys": s.Keys, + "required": s.Required, "found_local": s.FoundLocal, + "found_net": s.FoundNet, "retrieve_ms": s.RetrieveMS, "decode_ms": s.DecodeMS, "calls_by_ip": s.CallsByIP, diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index bfcc25a9..b8220045 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -20,6 +20,8 @@ import ( "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) +const targetRequiredPercent = 17 + type DownloadRequest struct { ActionID string } @@ -156,6 +158,11 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( totalSymbols := len(allSymbols) fields["totalSymbols"] = totalSymbols + // Compute target requirement (reporting only; does not change behavior) + targetRequiredCount := (totalSymbols*targetRequiredPercent + 99) / 100 + if targetRequiredCount < 1 && totalSymbols > 0 { + targetRequiredCount = 1 + } logtrace.Info(ctx, "Retrieving all symbols for decode", fields) // Enable retrieve metrics capture for this action @@ -190,7 +197,13 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout( // Set minimal retrieve summary and emit event strictly from internal collector cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS) - if b, err := json.MarshalIndent(cm.BuildDownloadEventPayloadFromCollector(actionID), "", " "); err == nil { + payload := cm.BuildDownloadEventPayloadFromCollector(actionID) + if retrieve, ok := payload["retrieve"].(map[string]any); ok { + retrieve["target_required_percent"] = targetRequiredPercent + retrieve["target_required_count"] = targetRequiredCount + retrieve["total_symbols"] = totalSymbols + } + if b, err := json.MarshalIndent(payload, "", " "); err == nil { task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send) }