From f1100e67f93c3241e1bcfac2f09eb1b9389e3acb Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Mon, 15 Sep 2025 16:39:32 +0500 Subject: [PATCH] improve write deadline for p2p data upload --- p2p/kademlia/network.go | 54 ++++++++++++++++------ supernode/services/cascade/adaptors/p2p.go | 2 +- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 366f0f4a..56e7ac55 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -627,20 +627,7 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes // ---- retryable RPC helpers ------------------------------------------------- func (s *Network) rpcOnceWrapper(ctx context.Context, cw *connWrapper, remoteAddr string, data []byte, timeout time.Duration, msgType int) (*Message, error) { - - sizeMB := float64(len(data)) / (1024.0 * 1024.0) // data is your gob-encoded message - throughputFloor := 8.0 // MB/s (~64 Mbps) - est := time.Duration(sizeMB / throughputFloor * float64(time.Second)) - base := 1 * time.Second - cushion := 5 * time.Second - - writeDL := base + est + cushion - if writeDL < 5*time.Second { - writeDL = 5 * time.Second - } - if writeDL > timeout-1*time.Second { - writeDL = timeout - 1*time.Second - } + writeDL := calcWriteDeadline(timeout, len(data), 2.0) // target ~2 MB/s retried := false for { @@ -1430,3 +1417,42 @@ func readDeadlineFor(msgType int, overall time.Duration) time.Duration { return overall // Bulk responses keep full budget } } + +// calcWriteDeadline returns a conservative write deadline based on payload size. +// - targetMBps: assumed sustained throughput (lower = more lenient). +// - We reserve some headroom from overall timeout for server processing/response. +func calcWriteDeadline(timeout time.Duration, sizeBytes int, targetMBps float64) time.Duration { + if timeout <= 0 { + timeout = 30 * time.Second + } + // Leave headroom for server processing + response + const reserve = 8 * time.Second + maxBudget := timeout - reserve + if maxBudget < 5*time.Second { + maxBudget = timeout - 1*time.Second + if maxBudget < 3*time.Second { + maxBudget = 3 * time.Second + } + } + + sizeMB := float64(sizeBytes) / (1024.0 * 1024.0) + base := 2 * time.Second + cushion := 5 * time.Second + + // Softer floor: assume ~2 MB/s; increase if you like. + if targetMBps <= 0 { + targetMBps = 2.0 + } + est := time.Duration(sizeMB / targetMBps * float64(time.Second)) + + writeDL := base + est + cushion + + // Ensure a more generous minimum for big-ish payloads + if writeDL < 10*time.Second { + writeDL = 10 * time.Second + } + if writeDL > maxBudget { + writeDL = maxBudget + } + return writeDL +} diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go index b3e6377c..be2eb74c 100644 --- a/supernode/services/cascade/adaptors/p2p.go +++ b/supernode/services/cascade/adaptors/p2p.go @@ -20,7 +20,7 @@ import ( ) const ( - loadSymbolsBatchSize = 5000 + loadSymbolsBatchSize = 3000 // Minimum first-pass coverage to store before returning from Register (percent) storeSymbolsPercent = 18