From 18aeb4e81a299793816a690bc3518769a204db76 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 13:44:27 +0000 Subject: [PATCH 01/15] Implement advanced bandwidth optimization with adaptive frame sizing and enhanced congestion control Co-Authored-By: fatedier --- pkg/sender/frame.go | 19 ++++++++ pkg/sender/rtt.go | 86 ++++++++++++++++++++++++++++++++++ pkg/sender/sender.go | 102 ++++++++++++++++++++++++++++++++++++++++- pkg/sender/transfer.go | 75 ++++++++++++++++++++++++++++-- 4 files changed, 277 insertions(+), 5 deletions(-) create mode 100644 pkg/sender/rtt.go diff --git a/pkg/sender/frame.go b/pkg/sender/frame.go index b9ae86e..d58083a 100644 --- a/pkg/sender/frame.go +++ b/pkg/sender/frame.go @@ -14,6 +14,7 @@ type SendFrame struct { retryTimes int hasAck bool transferID int // ID of the transfer this frame is assigned to + rtt time.Duration // Last measured RTT for this frame mu sync.Mutex } @@ -31,6 +32,24 @@ func (sf *SendFrame) UpdateSendTime() { sf.mu.Unlock() } +func (sf *SendFrame) GetSendTime() time.Time { + sf.mu.Lock() + defer sf.mu.Unlock() + return sf.sendTime +} + +func (sf *SendFrame) SetRTT(rtt time.Duration) { + sf.mu.Lock() + sf.rtt = rtt + sf.mu.Unlock() +} + +func (sf *SendFrame) GetRTT() time.Duration { + sf.mu.Lock() + defer sf.mu.Unlock() + return sf.rtt +} + func (sf *SendFrame) FrameID() uint32 { return sf.frame.FrameID } diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go new file mode 100644 index 0000000..789fb92 --- /dev/null +++ b/pkg/sender/rtt.go @@ -0,0 +1,86 @@ +package sender + +import ( + "math" + "sync" + "time" +) + +type RTTStats struct { + minRTT time.Duration + smoothedRTT time.Duration + rttVar time.Duration + latestRTT time.Duration + samples int + mu sync.Mutex +} + +func NewRTTStats() *RTTStats { + return &RTTStats{ + minRTT: time.Hour, // Initialize to a large value + smoothedRTT: 0, + rttVar: 0, + latestRTT: 0, + samples: 0, + } +} + +func (r *RTTStats) UpdateRTT(sendTime time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + + rtt := time.Since(sendTime) + r.latestRTT = rtt + + if rtt < r.minRTT { + r.minRTT = rtt + } + + if r.samples == 0 { + r.smoothedRTT = rtt + r.rttVar = rtt / 2 + } else { + rttDelta := time.Duration(math.Abs(float64(r.smoothedRTT - rtt))) + r.rttVar = r.rttVar*3/4 + rttDelta/4 + + r.smoothedRTT = r.smoothedRTT*7/8 + rtt/8 + } + + r.samples++ +} + +func (r *RTTStats) GetSmoothedRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.samples == 0 { + return time.Millisecond * 100 // Default value if no samples + } + return r.smoothedRTT +} + +func (r *RTTStats) GetRTTVariation() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.samples == 0 { + return time.Millisecond * 50 // Default value if no samples + } + return r.rttVar +} + +func (r *RTTStats) GetMinRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + + if r.minRTT == time.Hour { + return time.Millisecond * 50 // Default value if no valid minimum + } + return r.minRTT +} + +func (r *RTTStats) GetLatestRTT() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + return r.latestRTT +} diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index fcc338e..a18ef77 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -23,6 +23,9 @@ type Sender struct { // each frame size frameSize int + minFrameSize int + maxFrameSize int + adaptiveFrameSizingEnabled bool // send src to remote Receiver src io.Reader @@ -45,6 +48,9 @@ type Sender struct { waitAcks map[uint32]*SendFrame bufferFrames []*SendFrame + rttStats *RTTStats + lastFrameSizeAdj time.Time + // 1 means all frames has been sent sendAll bool mu sync.Mutex @@ -62,9 +68,22 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se maxBufferCount = 100 } + minFrameSize := 1024 // 1KB minimum + maxFrameSize := 64 * 1024 // 64KB maximum + + if frameSize < minFrameSize { + frameSize = minFrameSize + } else if frameSize > maxFrameSize { + frameSize = maxFrameSize + } + + now := time.Now() s := &Sender{ id: id, frameSize: frameSize, + minFrameSize: minFrameSize, + maxFrameSize: maxFrameSize, + adaptiveFrameSizingEnabled: true, src: src, frameCh: make(chan *SendFrame), ackCh: make(chan *stream.Ack), @@ -77,6 +96,8 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se limiter: make(chan struct{}, maxBufferCount), waitAcks: make(map[uint32]*SendFrame), bufferFrames: make([]*SendFrame, 0), + rttStats: NewRTTStats(), + lastFrameSizeAdj: now, sendShutdown: shutdown.New(), ackShutdown: shutdown.New(), } @@ -93,6 +114,37 @@ func (sender *Sender) EnableDynamicAllocation() { sender.dynamicAllocationEnabled = true } +func (sender *Sender) EnableAdaptiveFrameSizing() { + sender.mu.Lock() + defer sender.mu.Unlock() + + sender.adaptiveFrameSizingEnabled = true +} + +func (sender *Sender) SetFrameSizeBounds(minSize, maxSize int) error { + if !stream.IsValidFrameSize(minSize) || !stream.IsValidFrameSize(maxSize) { + return fmt.Errorf("invalid frame size bounds") + } + + if minSize > maxSize { + return fmt.Errorf("minimum frame size cannot be larger than maximum") + } + + sender.mu.Lock() + defer sender.mu.Unlock() + + sender.minFrameSize = minSize + sender.maxFrameSize = maxSize + + if sender.frameSize < minSize { + sender.frameSize = minSize + } else if sender.frameSize > maxSize { + sender.frameSize = maxSize + } + + return nil +} + func (sender *Sender) HandleStream(s *stream.FrameStream) { sender.mu.Lock() if sender.sendAll { @@ -194,6 +246,44 @@ func (sender *Sender) loopSend() { for { <-sender.limiter + if sender.adaptiveFrameSizingEnabled { + now := time.Now() + if now.Sub(sender.lastFrameSizeAdj) > time.Second { + sender.mu.Lock() + + smoothedRTT := sender.rttStats.GetSmoothedRTT() + rttVar := sender.rttStats.GetRTTVariation() + minRTT := sender.rttStats.GetMinRTT() + + currentSize := sender.frameSize + newSize := currentSize + + // If network is stable (low RTT variation), increase frame size + if rttVar < smoothedRTT/4 && smoothedRTT < minRTT*1.5 { + // Network is stable, increase frame size + newSize = int(float64(currentSize) * 1.25) // Increase by 25% + + if newSize > sender.maxFrameSize { + newSize = sender.maxFrameSize + } + } else if rttVar > smoothedRTT/2 || smoothedRTT > minRTT*2 { + // Network is unstable or congested, decrease frame size + newSize = int(float64(currentSize) * 0.75) // Decrease by 25% + + if newSize < sender.minFrameSize { + newSize = sender.minFrameSize + } + } + + if newSize != currentSize { + sender.frameSize = newSize + } + + sender.lastFrameSizeAdj = now + sender.mu.Unlock() + } + } + // retry first var retryFrame *SendFrame sender.mu.Lock() @@ -213,8 +303,12 @@ func (sender *Sender) loopSend() { continue } + sender.mu.Lock() + currentFrameSize := sender.frameSize + sender.mu.Unlock() + // no retry frames, get a new frame from src - buf := make([]byte, sender.frameSize) + buf := make([]byte, currentFrameSize) n, err := sender.src.Read(buf) if err == io.EOF { // send last frame and it's buffer is nil @@ -296,6 +390,12 @@ func (sender *Sender) ackHandler() { sender.mu.Lock() waitSendFrame, ok := sender.waitAcks[ack.FrameID] if ok { + if !waitSendFrame.GetSendTime().IsZero() { + rtt := time.Since(waitSendFrame.GetSendTime()) + waitSendFrame.SetRTT(rtt) + sender.rttStats.UpdateRTT(waitSendFrame.GetSendTime()) + } + waitSendFrame.SetAck() delete(sender.waitAcks, ack.FrameID) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 7d9f840..c828fa6 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -22,6 +22,10 @@ type Transfer struct { startTime time.Time lastMetricTime time.Time currentThroughput float64 // bytes per second + + rttStats *RTTStats + congestionWindow int64 + lastCongestionAdj time.Time s *stream.FrameStream limiter *limit.Limiter @@ -49,6 +53,9 @@ func NewTransfer(id int, maxBufferCount int, s *stream.FrameStream, startTime: now, lastMetricTime: now, currentThroughput: 0, + rttStats: NewRTTStats(), + congestionWindow: int64(1), + lastCongestionAdj: now, s: s, limiter: limit.NewLimiter(int64(1)), frameCh: frameCh, @@ -105,7 +112,14 @@ func (t *Transfer) frameSender() { if t.inSlowStart { n = 2 * n } else { - n++ + rttVar := t.rttStats.GetRTTVariation() + smoothedRTT := t.rttStats.GetSmoothedRTT() + + if rttVar < smoothedRTT/4 { + n += n/4 + } else { + n++ + } } if n > t.maxBufferCount { @@ -130,10 +144,36 @@ func (t *Transfer) frameSender() { continue } + sf.UpdateSendTime() + t.mu.Lock() t.waitAcks[sf.FrameID()] = sf t.mu.Unlock() + if !t.inSlowStart && t.framesSent > 20 { + t.mu.Lock() + now := time.Now() + smoothedRTT := t.rttStats.GetSmoothedRTT() + rttTimeout := smoothedRTT * 3 // Timeout threshold + + for frameID, waitFrame := range t.waitAcks { + if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() { + elapsed := now.Sub(waitFrame.sendTime) + if elapsed > rttTimeout { + waitFrame.retryTimes++ + waitFrame.UpdateSendTime() // Update send time for retransmission + + err = t.s.WriteFrame(waitFrame.Frame()) + if err != nil { + t.mu.Unlock() + return + } + } + } + } + t.mu.Unlock() + } + err = t.s.WriteFrame(sf.Frame()) if err != nil { return @@ -154,6 +194,10 @@ func (t *Transfer) ackReceiver() { t.mu.Lock() sf, ok := t.waitAcks[ack.FrameID] if ok { + if !sf.sendTime.IsZero() { + t.rttStats.UpdateRTT(sf.sendTime) + } + t.framesSent++ if sf.Frame().Buf != nil { t.bytesTransferred += uint64(len(sf.Frame().Buf)) @@ -167,13 +211,34 @@ func (t *Transfer) ackReceiver() { // Calculate bytes per second t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds - currentLimit := t.limiter.LimitNum() - if t.currentThroughput > 0 { + if now.Sub(t.lastCongestionAdj) > 100*time.Millisecond { + smoothedRTT := t.rttStats.GetSmoothedRTT() + rttVar := t.rttStats.GetRTTVariation() + minRTT := t.rttStats.GetMinRTT() + + currentLimit := t.limiter.LimitNum() newLimit := currentLimit + if t.inSlowStart { newLimit = currentLimit * 2 + + if smoothedRTT > minRTT*2 && t.framesSent > 20 { + t.inSlowStart = false + newLimit = currentLimit + } } else { - newLimit = currentLimit + (currentLimit / 10) + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 8) + } else { + newLimit = currentLimit + (currentLimit / 16) + } + + if smoothedRTT > minRTT*3 { + newLimit = currentLimit / 2 + if newLimit < 1 { + newLimit = 1 + } + } } // Cap at maxBufferCount @@ -182,7 +247,9 @@ func (t *Transfer) ackReceiver() { t.inSlowStart = false } + t.congestionWindow = newLimit t.limiter.SetLimit(newLimit) + t.lastCongestionAdj = now } t.lastMetricTime = now From a41d698fa10297378e8e4f7e1ce9f9f2bdd05145 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 13:46:11 +0000 Subject: [PATCH 02/15] Fix code formatting issues Co-Authored-By: fatedier --- pkg/sender/frame.go | 2 +- pkg/sender/rtt.go | 6 ++-- pkg/sender/sender.go | 78 +++++++++++++++++++++--------------------- pkg/sender/transfer.go | 16 ++++----- 4 files changed, 51 insertions(+), 51 deletions(-) diff --git a/pkg/sender/frame.go b/pkg/sender/frame.go index d58083a..d8615b2 100644 --- a/pkg/sender/frame.go +++ b/pkg/sender/frame.go @@ -13,7 +13,7 @@ type SendFrame struct { sendTime time.Time retryTimes int hasAck bool - transferID int // ID of the transfer this frame is assigned to + transferID int // ID of the transfer this frame is assigned to rtt time.Duration // Last measured RTT for this frame mu sync.Mutex diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 789fb92..1628131 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -52,7 +52,7 @@ func (r *RTTStats) UpdateRTT(sendTime time.Time) { func (r *RTTStats) GetSmoothedRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 100 // Default value if no samples } @@ -62,7 +62,7 @@ func (r *RTTStats) GetSmoothedRTT() time.Duration { func (r *RTTStats) GetRTTVariation() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 50 // Default value if no samples } @@ -72,7 +72,7 @@ func (r *RTTStats) GetRTTVariation() time.Duration { func (r *RTTStats) GetMinRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.minRTT == time.Hour { return time.Millisecond * 50 // Default value if no valid minimum } diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index a18ef77..554f9a6 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -22,9 +22,9 @@ type Sender struct { id uint32 // each frame size - frameSize int - minFrameSize int - maxFrameSize int + frameSize int + minFrameSize int + maxFrameSize int adaptiveFrameSizingEnabled bool // send src to remote Receiver @@ -48,7 +48,7 @@ type Sender struct { waitAcks map[uint32]*SendFrame bufferFrames []*SendFrame - rttStats *RTTStats + rttStats *RTTStats lastFrameSizeAdj time.Time // 1 means all frames has been sent @@ -68,9 +68,9 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se maxBufferCount = 100 } - minFrameSize := 1024 // 1KB minimum + minFrameSize := 1024 // 1KB minimum maxFrameSize := 64 * 1024 // 64KB maximum - + if frameSize < minFrameSize { frameSize = minFrameSize } else if frameSize > maxFrameSize { @@ -79,27 +79,27 @@ func NewSender(id uint32, src io.Reader, frameSize int, maxBufferCount int) (*Se now := time.Now() s := &Sender{ - id: id, - frameSize: frameSize, - minFrameSize: minFrameSize, - maxFrameSize: maxFrameSize, + id: id, + frameSize: frameSize, + minFrameSize: minFrameSize, + maxFrameSize: maxFrameSize, adaptiveFrameSizingEnabled: true, - src: src, - frameCh: make(chan *SendFrame), - ackCh: make(chan *stream.Ack), - dynamicAllocationEnabled: false, - transfers: make(map[int]*Transfer), - totalThroughput: 0, - allocationRatios: make(map[int]float64), - maxBufferCount: maxBufferCount, - retryFrames: make([]*SendFrame, 0), - limiter: make(chan struct{}, maxBufferCount), - waitAcks: make(map[uint32]*SendFrame), - bufferFrames: make([]*SendFrame, 0), - rttStats: NewRTTStats(), - lastFrameSizeAdj: now, - sendShutdown: shutdown.New(), - ackShutdown: shutdown.New(), + src: src, + frameCh: make(chan *SendFrame), + ackCh: make(chan *stream.Ack), + dynamicAllocationEnabled: false, + transfers: make(map[int]*Transfer), + totalThroughput: 0, + allocationRatios: make(map[int]float64), + maxBufferCount: maxBufferCount, + retryFrames: make([]*SendFrame, 0), + limiter: make(chan struct{}, maxBufferCount), + waitAcks: make(map[uint32]*SendFrame), + bufferFrames: make([]*SendFrame, 0), + rttStats: NewRTTStats(), + lastFrameSizeAdj: now, + sendShutdown: shutdown.New(), + ackShutdown: shutdown.New(), } for i := 0; i < maxBufferCount; i++ { s.limiter <- struct{}{} @@ -125,23 +125,23 @@ func (sender *Sender) SetFrameSizeBounds(minSize, maxSize int) error { if !stream.IsValidFrameSize(minSize) || !stream.IsValidFrameSize(maxSize) { return fmt.Errorf("invalid frame size bounds") } - + if minSize > maxSize { return fmt.Errorf("minimum frame size cannot be larger than maximum") } - + sender.mu.Lock() defer sender.mu.Unlock() - + sender.minFrameSize = minSize sender.maxFrameSize = maxSize - + if sender.frameSize < minSize { sender.frameSize = minSize } else if sender.frameSize > maxSize { sender.frameSize = maxSize } - + return nil } @@ -250,35 +250,35 @@ func (sender *Sender) loopSend() { now := time.Now() if now.Sub(sender.lastFrameSizeAdj) > time.Second { sender.mu.Lock() - + smoothedRTT := sender.rttStats.GetSmoothedRTT() rttVar := sender.rttStats.GetRTTVariation() minRTT := sender.rttStats.GetMinRTT() - + currentSize := sender.frameSize newSize := currentSize - + // If network is stable (low RTT variation), increase frame size if rttVar < smoothedRTT/4 && smoothedRTT < minRTT*1.5 { // Network is stable, increase frame size newSize = int(float64(currentSize) * 1.25) // Increase by 25% - + if newSize > sender.maxFrameSize { newSize = sender.maxFrameSize } } else if rttVar > smoothedRTT/2 || smoothedRTT > minRTT*2 { // Network is unstable or congested, decrease frame size newSize = int(float64(currentSize) * 0.75) // Decrease by 25% - + if newSize < sender.minFrameSize { newSize = sender.minFrameSize } } - + if newSize != currentSize { sender.frameSize = newSize } - + sender.lastFrameSizeAdj = now sender.mu.Unlock() } @@ -395,7 +395,7 @@ func (sender *Sender) ackHandler() { waitSendFrame.SetRTT(rtt) sender.rttStats.UpdateRTT(waitSendFrame.GetSendTime()) } - + waitSendFrame.SetAck() delete(sender.waitAcks, ack.FrameID) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index c828fa6..36fa81d 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -22,7 +22,7 @@ type Transfer struct { startTime time.Time lastMetricTime time.Time currentThroughput float64 // bytes per second - + rttStats *RTTStats congestionWindow int64 lastCongestionAdj time.Time @@ -114,9 +114,9 @@ func (t *Transfer) frameSender() { } else { rttVar := t.rttStats.GetRTTVariation() smoothedRTT := t.rttStats.GetSmoothedRTT() - + if rttVar < smoothedRTT/4 { - n += n/4 + n += n / 4 } else { n++ } @@ -155,14 +155,14 @@ func (t *Transfer) frameSender() { now := time.Now() smoothedRTT := t.rttStats.GetSmoothedRTT() rttTimeout := smoothedRTT * 3 // Timeout threshold - + for frameID, waitFrame := range t.waitAcks { if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() { elapsed := now.Sub(waitFrame.sendTime) if elapsed > rttTimeout { waitFrame.retryTimes++ waitFrame.UpdateSendTime() // Update send time for retransmission - + err = t.s.WriteFrame(waitFrame.Frame()) if err != nil { t.mu.Unlock() @@ -215,13 +215,13 @@ func (t *Transfer) ackReceiver() { smoothedRTT := t.rttStats.GetSmoothedRTT() rttVar := t.rttStats.GetRTTVariation() minRTT := t.rttStats.GetMinRTT() - + currentLimit := t.limiter.LimitNum() newLimit := currentLimit if t.inSlowStart { newLimit = currentLimit * 2 - + if smoothedRTT > minRTT*2 && t.framesSent > 20 { t.inSlowStart = false newLimit = currentLimit @@ -232,7 +232,7 @@ func (t *Transfer) ackReceiver() { } else { newLimit = currentLimit + (currentLimit / 16) } - + if smoothedRTT > minRTT*3 { newLimit = currentLimit / 2 if newLimit < 1 { From c6fcd20d9712eff1ecce53dce60cc2b0986964a7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 13:47:20 +0000 Subject: [PATCH 03/15] Fix type conversion issue with time.Duration comparison Co-Authored-By: fatedier --- pkg/sender/sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 554f9a6..5026be8 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -259,7 +259,7 @@ func (sender *Sender) loopSend() { newSize := currentSize // If network is stable (low RTT variation), increase frame size - if rttVar < smoothedRTT/4 && smoothedRTT < minRTT*1.5 { + if rttVar < smoothedRTT/4 && smoothedRTT < minRTT+minRTT/2 { // Network is stable, increase frame size newSize = int(float64(currentSize) * 1.25) // Increase by 25% From edc5de8955b71f676dbbb97f9f67dd9997f82f12 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:03:53 +0000 Subject: [PATCH 04/15] Fix deadlock in predictive retransmission by separating lock acquisition from network operations Co-Authored-By: fatedier --- pkg/sender/transfer.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 36fa81d..36c87ec 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -151,27 +151,39 @@ func (t *Transfer) frameSender() { t.mu.Unlock() if !t.inSlowStart && t.framesSent > 20 { + var framesToRetry []*SendFrame + t.mu.Lock() now := time.Now() smoothedRTT := t.rttStats.GetSmoothedRTT() rttTimeout := smoothedRTT * 3 // Timeout threshold - + + retryCount := 0 + maxRetryPerCycle := 5 + for frameID, waitFrame := range t.waitAcks { - if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() { + if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() && waitFrame.retryTimes < 3 { elapsed := now.Sub(waitFrame.sendTime) if elapsed > rttTimeout { - waitFrame.retryTimes++ - waitFrame.UpdateSendTime() // Update send time for retransmission - - err = t.s.WriteFrame(waitFrame.Frame()) - if err != nil { - t.mu.Unlock() - return + framesToRetry = append(framesToRetry, waitFrame) + retryCount++ + if retryCount >= maxRetryPerCycle { + break } } } } t.mu.Unlock() + + for _, waitFrame := range framesToRetry { + waitFrame.retryTimes++ + waitFrame.UpdateSendTime() + + err = t.s.WriteFrame(waitFrame.Frame()) + if err != nil { + return + } + } } err = t.s.WriteFrame(sf.Frame()) From d8b1a1d803923b3b715c37ddf8c9210a0d03d65b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:05:43 +0000 Subject: [PATCH 05/15] Fix Go formatting in transfer.go Co-Authored-By: fatedier --- pkg/sender/transfer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 36c87ec..9e83c8e 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -152,15 +152,15 @@ func (t *Transfer) frameSender() { if !t.inSlowStart && t.framesSent > 20 { var framesToRetry []*SendFrame - + t.mu.Lock() now := time.Now() smoothedRTT := t.rttStats.GetSmoothedRTT() rttTimeout := smoothedRTT * 3 // Timeout threshold - + retryCount := 0 maxRetryPerCycle := 5 - + for frameID, waitFrame := range t.waitAcks { if frameID != sf.FrameID() && !waitFrame.sendTime.IsZero() && waitFrame.retryTimes < 3 { elapsed := now.Sub(waitFrame.sendTime) @@ -174,11 +174,11 @@ func (t *Transfer) frameSender() { } } t.mu.Unlock() - + for _, waitFrame := range framesToRetry { waitFrame.retryTimes++ waitFrame.UpdateSendTime() - + err = t.s.WriteFrame(waitFrame.Frame()) if err != nil { return From c3eda9dd925706500aef0c2178479cfd9640f97c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:23:08 +0000 Subject: [PATCH 06/15] Implement dynamic RTT probing for network environment detection and optimization Co-Authored-By: fatedier --- pkg/sender/rtt.go | 107 +++++++++++++++++++++++++++++++++++------ pkg/sender/sender.go | 47 +++++++++++++----- pkg/sender/transfer.go | 79 ++++++++++++++++++++++++------ 3 files changed, 194 insertions(+), 39 deletions(-) diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 1628131..91fb824 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -6,22 +6,44 @@ import ( "time" ) +type NetworkEnvironmentType int + +const ( + NetworkEnvironmentUnknown NetworkEnvironmentType = iota + NetworkEnvironmentLocal + NetworkEnvironmentRemote +) + +var RTTThresholds = struct { + LocalThreshold time.Duration + RemoteThreshold time.Duration +}{ + LocalThreshold: 5 * time.Millisecond, + RemoteThreshold: 50 * time.Millisecond, +} + type RTTStats struct { - minRTT time.Duration - smoothedRTT time.Duration - rttVar time.Duration - latestRTT time.Duration - samples int - mu sync.Mutex + minRTT time.Duration + smoothedRTT time.Duration + rttVar time.Duration + latestRTT time.Duration + samples int + networkEnvironment NetworkEnvironmentType + environmentDetected bool + lastEnvCheck time.Time + mu sync.Mutex } func NewRTTStats() *RTTStats { return &RTTStats{ - minRTT: time.Hour, // Initialize to a large value - smoothedRTT: 0, - rttVar: 0, - latestRTT: 0, - samples: 0, + minRTT: time.Hour, // Initialize to a large value + smoothedRTT: 0, + rttVar: 0, + latestRTT: 0, + samples: 0, + networkEnvironment: NetworkEnvironmentUnknown, + environmentDetected: false, + lastEnvCheck: time.Now(), } } @@ -47,12 +69,37 @@ func (r *RTTStats) UpdateRTT(sendTime time.Time) { } r.samples++ + + if !r.environmentDetected || r.samples%10 == 0 { + r.detectNetworkEnvironment() + } +} + +func (r *RTTStats) detectNetworkEnvironment() { + if r.samples < 5 { + return + } + + if r.smoothedRTT <= RTTThresholds.LocalThreshold { + r.networkEnvironment = NetworkEnvironmentLocal + } else if r.smoothedRTT >= RTTThresholds.RemoteThreshold { + r.networkEnvironment = NetworkEnvironmentRemote + } else { + if r.minRTT <= RTTThresholds.LocalThreshold*2 { + r.networkEnvironment = NetworkEnvironmentLocal + } else { + r.networkEnvironment = NetworkEnvironmentRemote + } + } + + r.environmentDetected = true + r.lastEnvCheck = time.Now() } func (r *RTTStats) GetSmoothedRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 100 // Default value if no samples } @@ -62,7 +109,7 @@ func (r *RTTStats) GetSmoothedRTT() time.Duration { func (r *RTTStats) GetRTTVariation() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 50 // Default value if no samples } @@ -72,7 +119,7 @@ func (r *RTTStats) GetRTTVariation() time.Duration { func (r *RTTStats) GetMinRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.minRTT == time.Hour { return time.Millisecond * 50 // Default value if no valid minimum } @@ -84,3 +131,35 @@ func (r *RTTStats) GetLatestRTT() time.Duration { defer r.mu.Unlock() return r.latestRTT } + +func (r *RTTStats) GetNetworkEnvironment() NetworkEnvironmentType { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment +} + +func (r *RTTStats) IsLocalNetwork() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment == NetworkEnvironmentLocal +} + +func (r *RTTStats) IsRemoteNetwork() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.networkEnvironment == NetworkEnvironmentRemote +} + +func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { + r.mu.Lock() + defer r.mu.Unlock() + + switch r.networkEnvironment { + case NetworkEnvironmentLocal: + return 0.5 // More aggressive for local networks + case NetworkEnvironmentRemote: + return 2.0 // More conservative for remote networks + default: + return 1.0 // Default multiplier + } +} diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 5026be8..bf9f45d 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -254,27 +254,52 @@ func (sender *Sender) loopSend() { smoothedRTT := sender.rttStats.GetSmoothedRTT() rttVar := sender.rttStats.GetRTTVariation() minRTT := sender.rttStats.GetMinRTT() + rttMultiplier := sender.rttStats.GetAdaptiveRTTMultiplier() currentSize := sender.frameSize newSize := currentSize - // If network is stable (low RTT variation), increase frame size - if rttVar < smoothedRTT/4 && smoothedRTT < minRTT+minRTT/2 { - // Network is stable, increase frame size - newSize = int(float64(currentSize) * 1.25) // Increase by 25% + isLocalNetwork := sender.rttStats.IsLocalNetwork() + isRemoteNetwork := sender.rttStats.IsRemoteNetwork() - if newSize > sender.maxFrameSize { - newSize = sender.maxFrameSize + if isLocalNetwork { + if rttVar < smoothedRTT/4 { + newSize = int(float64(currentSize) * 1.5) // Increase by 50% + } else if rttVar < smoothedRTT/2 { + newSize = int(float64(currentSize) * 1.25) // Increase by 25% + } else if rttVar > smoothedRTT/2 { + newSize = int(float64(currentSize) * 0.9) // Decrease by 10% } - } else if rttVar > smoothedRTT/2 || smoothedRTT > minRTT*2 { - // Network is unstable or congested, decrease frame size - newSize = int(float64(currentSize) * 0.75) // Decrease by 25% + } else if isRemoteNetwork { + if rttVar < smoothedRTT/8 && smoothedRTT < minRTT+minRTT/4 { + newSize = int(float64(currentSize) * 1.1) // Increase by 10% + } else if rttVar > smoothedRTT/4 || smoothedRTT > minRTT*1.5 { + newSize = int(float64(currentSize) * 0.75) // Decrease by 25% + } + } else { + if rttVar < smoothedRTT/4 && smoothedRTT < minRTT+minRTT/2 { + // Network is stable, increase frame size + newSize = int(float64(currentSize) * (1.0 + 0.25/rttMultiplier)) + + if newSize > sender.maxFrameSize { + newSize = sender.maxFrameSize + } + } else if rttVar > smoothedRTT/2 || smoothedRTT > minRTT*2 { + // Network is unstable or congested, decrease frame size + newSize = int(float64(currentSize) * (1.0 - 0.25*rttMultiplier)) - if newSize < sender.minFrameSize { - newSize = sender.minFrameSize + if newSize < sender.minFrameSize { + newSize = sender.minFrameSize + } } } + if newSize > sender.maxFrameSize { + newSize = sender.maxFrameSize + } else if newSize < sender.minFrameSize { + newSize = sender.minFrameSize + } + if newSize != currentSize { sender.frameSize = newSize } diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 9e83c8e..0d849c5 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -227,28 +227,79 @@ func (t *Transfer) ackReceiver() { smoothedRTT := t.rttStats.GetSmoothedRTT() rttVar := t.rttStats.GetRTTVariation() minRTT := t.rttStats.GetMinRTT() + rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() + isLocalNetwork := t.rttStats.IsLocalNetwork() + isRemoteNetwork := t.rttStats.IsRemoteNetwork() currentLimit := t.limiter.LimitNum() newLimit := currentLimit if t.inSlowStart { - newLimit = currentLimit * 2 - - if smoothedRTT > minRTT*2 && t.framesSent > 20 { - t.inSlowStart = false - newLimit = currentLimit + if isLocalNetwork { + newLimit = currentLimit * 3 + + if smoothedRTT > minRTT*1.5 && t.framesSent > 10 { + t.inSlowStart = false + newLimit = currentLimit * 2 + } + } else if isRemoteNetwork { + newLimit = currentLimit * 2 + + if smoothedRTT > minRTT*3 && t.framesSent > 30 { + t.inSlowStart = false + newLimit = currentLimit + } + } else { + newLimit = currentLimit * 2 + + if smoothedRTT > minRTT*(2*rttMultiplier) && t.framesSent > 20 { + t.inSlowStart = false + newLimit = currentLimit + } } } else { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 8) + if isLocalNetwork { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 4) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 6) + } else { + newLimit = currentLimit + (currentLimit / 10) + } + + if smoothedRTT > minRTT*2 { + newLimit = currentLimit * 3 / 4 + if newLimit < 1 { + newLimit = 1 + } + } + } else if isRemoteNetwork { + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 10) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 16) + } else { + newLimit = currentLimit + (currentLimit / 32) + } + + if smoothedRTT > minRTT*2 { + newLimit = currentLimit / 2 + if newLimit < 1 { + newLimit = 1 + } + } } else { - newLimit = currentLimit + (currentLimit / 16) - } - - if smoothedRTT > minRTT*3 { - newLimit = currentLimit / 2 - if newLimit < 1 { - newLimit = 1 + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + int64(float64(currentLimit) / (8 * rttMultiplier)) + } else { + newLimit = currentLimit + int64(float64(currentLimit) / (16 * rttMultiplier)) + } + + if smoothedRTT > minRTT*3 { + newLimit = int64(float64(currentLimit) / (2 * rttMultiplier)) + if newLimit < 1 { + newLimit = 1 + } } } } From 2b9fc8c1032c97b36764d9cb9495c13ce563ae01 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:29:01 +0000 Subject: [PATCH 07/15] Fix type conversion issues between time.Duration and float64 Co-Authored-By: fatedier --- pkg/sender/rtt.go | 10 +++++----- pkg/sender/sender.go | 4 ++-- pkg/sender/transfer.go | 29 +++++++++++++++-------------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 91fb824..162d244 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -15,7 +15,7 @@ const ( ) var RTTThresholds = struct { - LocalThreshold time.Duration + LocalThreshold time.Duration RemoteThreshold time.Duration }{ LocalThreshold: 5 * time.Millisecond, @@ -99,7 +99,7 @@ func (r *RTTStats) detectNetworkEnvironment() { func (r *RTTStats) GetSmoothedRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 100 // Default value if no samples } @@ -109,7 +109,7 @@ func (r *RTTStats) GetSmoothedRTT() time.Duration { func (r *RTTStats) GetRTTVariation() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.samples == 0 { return time.Millisecond * 50 // Default value if no samples } @@ -119,7 +119,7 @@ func (r *RTTStats) GetRTTVariation() time.Duration { func (r *RTTStats) GetMinRTT() time.Duration { r.mu.Lock() defer r.mu.Unlock() - + if r.minRTT == time.Hour { return time.Millisecond * 50 // Default value if no valid minimum } @@ -153,7 +153,7 @@ func (r *RTTStats) IsRemoteNetwork() bool { func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { r.mu.Lock() defer r.mu.Unlock() - + switch r.networkEnvironment { case NetworkEnvironmentLocal: return 0.5 // More aggressive for local networks diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index bf9f45d..fa62e3c 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -273,7 +273,7 @@ func (sender *Sender) loopSend() { } else if isRemoteNetwork { if rttVar < smoothedRTT/8 && smoothedRTT < minRTT+minRTT/4 { newSize = int(float64(currentSize) * 1.1) // Increase by 10% - } else if rttVar > smoothedRTT/4 || smoothedRTT > minRTT*1.5 { + } else if rttVar > smoothedRTT/4 || smoothedRTT > time.Duration(float64(minRTT)*1.5) { newSize = int(float64(currentSize) * 0.75) // Decrease by 25% } } else { @@ -284,7 +284,7 @@ func (sender *Sender) loopSend() { if newSize > sender.maxFrameSize { newSize = sender.maxFrameSize } - } else if rttVar > smoothedRTT/2 || smoothedRTT > minRTT*2 { + } else if rttVar > smoothedRTT/2 || smoothedRTT > time.Duration(float64(minRTT)*2) { // Network is unstable or congested, decrease frame size newSize = int(float64(currentSize) * (1.0 - 0.25*rttMultiplier)) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 0d849c5..d9ae221 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -237,22 +237,23 @@ func (t *Transfer) ackReceiver() { if t.inSlowStart { if isLocalNetwork { newLimit = currentLimit * 3 - - if smoothedRTT > minRTT*1.5 && t.framesSent > 10 { + + if smoothedRTT > time.Duration(float64(minRTT)*1.5) && t.framesSent > 10 { t.inSlowStart = false newLimit = currentLimit * 2 } } else if isRemoteNetwork { newLimit = currentLimit * 2 - - if smoothedRTT > minRTT*3 && t.framesSent > 30 { + + if smoothedRTT > time.Duration(float64(minRTT)*3) && t.framesSent > 30 { t.inSlowStart = false newLimit = currentLimit } } else { newLimit = currentLimit * 2 - - if smoothedRTT > minRTT*(2*rttMultiplier) && t.framesSent > 20 { + + multiplier := time.Duration(int64(2 * rttMultiplier)) + if smoothedRTT > minRTT*multiplier && t.framesSent > 20 { t.inSlowStart = false newLimit = currentLimit } @@ -266,8 +267,8 @@ func (t *Transfer) ackReceiver() { } else { newLimit = currentLimit + (currentLimit / 10) } - - if smoothedRTT > minRTT*2 { + + if smoothedRTT > time.Duration(float64(minRTT)*2) { newLimit = currentLimit * 3 / 4 if newLimit < 1 { newLimit = 1 @@ -281,8 +282,8 @@ func (t *Transfer) ackReceiver() { } else { newLimit = currentLimit + (currentLimit / 32) } - - if smoothedRTT > minRTT*2 { + + if smoothedRTT > time.Duration(float64(minRTT)*2) { newLimit = currentLimit / 2 if newLimit < 1 { newLimit = 1 @@ -290,12 +291,12 @@ func (t *Transfer) ackReceiver() { } } else { if rttVar < smoothedRTT/4 { - newLimit = currentLimit + int64(float64(currentLimit) / (8 * rttMultiplier)) + newLimit = currentLimit + int64(float64(currentLimit)/(8*rttMultiplier)) } else { - newLimit = currentLimit + int64(float64(currentLimit) / (16 * rttMultiplier)) + newLimit = currentLimit + int64(float64(currentLimit)/(16*rttMultiplier)) } - - if smoothedRTT > minRTT*3 { + + if smoothedRTT > time.Duration(float64(minRTT)*3) { newLimit = int64(float64(currentLimit) / (2 * rttMultiplier)) if newLimit < 1 { newLimit = 1 From e89ab8dfb5835780d92a500cd9819a97c026902a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:35:47 +0000 Subject: [PATCH 08/15] Enable adaptive frame sizing when dynamic allocation is enabled Co-Authored-By: fatedier --- client/send.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/send.go b/client/send.go index efb629d..de95cf1 100644 --- a/client/send.go +++ b/client/send.go @@ -93,6 +93,7 @@ func (svc *Service) sendFile(id string, filePath string) error { fmt.Println("Enabling dynamic frame allocation") } s.EnableDynamicAllocation() + s.EnableAdaptiveFrameSizing() } for _, worker := range m.Workers { From 2c1331e99714eb479de69ca1c9b050bb616d7d78 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:38:24 +0000 Subject: [PATCH 09/15] Implement dynamic RTT probing with network environment detection and worker-aware congestion control Co-Authored-By: fatedier --- pkg/sender/rtt.go | 40 ++++++++++++-- pkg/sender/transfer.go | 117 ++++++++++++++++++++++++++++++++++------- 2 files changed, 132 insertions(+), 25 deletions(-) diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 162d244..4c3b945 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -86,7 +86,15 @@ func (r *RTTStats) detectNetworkEnvironment() { r.networkEnvironment = NetworkEnvironmentRemote } else { if r.minRTT <= RTTThresholds.LocalThreshold*2 { - r.networkEnvironment = NetworkEnvironmentLocal + if r.rttVar < r.smoothedRTT/4 { + r.networkEnvironment = NetworkEnvironmentLocal + } else { + if r.latestRTT < r.smoothedRTT { + r.networkEnvironment = NetworkEnvironmentLocal + } else { + r.networkEnvironment = NetworkEnvironmentRemote + } + } } else { r.networkEnvironment = NetworkEnvironmentRemote } @@ -153,13 +161,35 @@ func (r *RTTStats) IsRemoteNetwork() bool { func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { r.mu.Lock() defer r.mu.Unlock() - + + // Base multiplier based on network environment + var baseMultiplier float64 switch r.networkEnvironment { case NetworkEnvironmentLocal: - return 0.5 // More aggressive for local networks + baseMultiplier = 0.5 // More aggressive for local networks case NetworkEnvironmentRemote: - return 2.0 // More conservative for remote networks + baseMultiplier = 2.0 // More conservative for remote networks default: - return 1.0 // Default multiplier + baseMultiplier = 1.0 // Default multiplier + } + + if r.samples < 10 { + return baseMultiplier // Not enough samples to make adjustments + } + + stabilityRatio := float64(r.rttVar) / float64(r.smoothedRTT) + + if stabilityRatio < 0.1 { + return baseMultiplier * 0.8 + } else if stabilityRatio > 0.3 { + return baseMultiplier * 1.5 + } + + if r.latestRTT < r.smoothedRTT { + return baseMultiplier * 0.9 + } else if r.latestRTT > r.smoothedRTT*1.2 { + return baseMultiplier * 1.3 } + + return baseMultiplier } diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index d9ae221..ee31b16 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -259,7 +259,53 @@ func (t *Transfer) ackReceiver() { } } } else { - if isLocalNetwork { + workerID := int(t.id) + isSlowWorker := false + isFastWorker := false + + if len(s.transfers) > 1 && s.totalThroughput > 0 { + s.transfersMu.RLock() + var maxThroughput, minThroughput float64 + maxThroughput = 0 + minThroughput = float64(^uint(0) >> 1) // Max int value + + for _, tr := range s.transfers { + if tr.currentThroughput > maxThroughput { + maxThroughput = tr.currentThroughput + } + if tr.currentThroughput > 0 && tr.currentThroughput < minThroughput { + minThroughput = tr.currentThroughput + } + } + + if minThroughput != float64(^uint(0)>>1) && maxThroughput > 0 { + if t.currentThroughput == minThroughput { + isSlowWorker = true + } else if t.currentThroughput == maxThroughput { + isFastWorker = true + } + } + s.transfersMu.RUnlock() + } + + if isLocalNetwork { + if isSlowWorker { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 2) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 3) + } else { + newLimit = currentLimit + (currentLimit / 5) + } + } else if isFastWorker { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 3) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 5) + } else { + newLimit = currentLimit + (currentLimit / 8) + } + } else { if rttVar < smoothedRTT/4 { newLimit = currentLimit + (currentLimit / 4) } else if rttVar < smoothedRTT/2 { @@ -267,14 +313,28 @@ func (t *Transfer) ackReceiver() { } else { newLimit = currentLimit + (currentLimit / 10) } + } - if smoothedRTT > time.Duration(float64(minRTT)*2) { + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 4 / 5 + } else { newLimit = currentLimit * 3 / 4 - if newLimit < 1 { - newLimit = 1 - } } - } else if isRemoteNetwork { + if newLimit < 1 { + newLimit = 1 + } + } + } else if isRemoteNetwork { + if isSlowWorker { + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 8) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 12) + } else { + newLimit = currentLimit + (currentLimit / 20) + } + } else { if rttVar < smoothedRTT/8 { newLimit = currentLimit + (currentLimit / 10) } else if rttVar < smoothedRTT/4 { @@ -282,28 +342,45 @@ func (t *Transfer) ackReceiver() { } else { newLimit = currentLimit + (currentLimit / 32) } + } - if smoothedRTT > time.Duration(float64(minRTT)*2) { + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 3 / 5 + } else { newLimit = currentLimit / 2 - if newLimit < 1 { - newLimit = 1 - } } - } else { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + int64(float64(currentLimit)/(8*rttMultiplier)) - } else { - newLimit = currentLimit + int64(float64(currentLimit)/(16*rttMultiplier)) + if newLimit < 1 { + newLimit = 1 } + } + } else { + rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() + + if isSlowWorker { + rttMultiplier *= 0.8 + } else if isFastWorker { + rttMultiplier *= 1.2 + } + + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + int64(float64(currentLimit)/(6*rttMultiplier)) + } else { + newLimit = currentLimit + int64(float64(currentLimit)/(12*rttMultiplier)) + } - if smoothedRTT > time.Duration(float64(minRTT)*3) { - newLimit = int64(float64(currentLimit) / (2 * rttMultiplier)) - if newLimit < 1 { - newLimit = 1 - } + if smoothedRTT > time.Duration(float64(minRTT)*3) { + backoffFactor := 2.0 * rttMultiplier + if isSlowWorker { + backoffFactor *= 0.8 // Less aggressive backoff for slow workers + } + newLimit = int64(float64(currentLimit) / backoffFactor) + if newLimit < 1 { + newLimit = 1 } } } + } // Cap at maxBufferCount if newLimit > int64(t.maxBufferCount) { From 8f5097df5a454fd73eb71adca40c444800aa9e53 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:40:12 +0000 Subject: [PATCH 10/15] Fix build errors in dynamic RTT probing implementation Co-Authored-By: fatedier --- pkg/sender/rtt.go | 2 +- pkg/sender/transfer.go | 34 +++++++++++++--------------------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 4c3b945..72ea4c1 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -187,7 +187,7 @@ func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { if r.latestRTT < r.smoothedRTT { return baseMultiplier * 0.9 - } else if r.latestRTT > r.smoothedRTT*1.2 { + } else if r.latestRTT > time.Duration(float64(r.smoothedRTT)*1.2) { return baseMultiplier * 1.3 } diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index ee31b16..17dc12e 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -259,33 +259,26 @@ func (t *Transfer) ackReceiver() { } } } else { - workerID := int(t.id) isSlowWorker := false isFastWorker := false - if len(s.transfers) > 1 && s.totalThroughput > 0 { - s.transfersMu.RLock() - var maxThroughput, minThroughput float64 - maxThroughput = 0 - minThroughput = float64(^uint(0) >> 1) // Max int value + if t.framesSent > 20 { + // Calculate expected throughput based on RTT + expectedThroughput := float64(0) - for _, tr := range s.transfers { - if tr.currentThroughput > maxThroughput { - maxThroughput = tr.currentThroughput - } - if tr.currentThroughput > 0 && tr.currentThroughput < minThroughput { - minThroughput = tr.currentThroughput - } + if isLocalNetwork { + expectedThroughput = 500 * 1024 // 500KB/s as a reference point + } else if isRemoteNetwork { + expectedThroughput = 200 * 1024 // 200KB/s as a reference point + } else { + expectedThroughput = 300 * 1024 // 300KB/s as a reference point } - if minThroughput != float64(^uint(0)>>1) && maxThroughput > 0 { - if t.currentThroughput == minThroughput { - isSlowWorker = true - } else if t.currentThroughput == maxThroughput { - isFastWorker = true - } + if t.currentThroughput < expectedThroughput*0.5 { + isSlowWorker = true + } else if t.currentThroughput > expectedThroughput*1.5 { + isFastWorker = true } - s.transfersMu.RUnlock() } if isLocalNetwork { @@ -380,7 +373,6 @@ func (t *Transfer) ackReceiver() { } } } - } // Cap at maxBufferCount if newLimit > int64(t.maxBufferCount) { From 5d3eef30ce64f1b865592b393dac2f9303bc6d36 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:42:24 +0000 Subject: [PATCH 11/15] Fix indentation in transfer.go for dynamic RTT probing Co-Authored-By: fatedier --- pkg/sender/transfer.go | 192 ++++++++++++++++++++--------------------- 1 file changed, 96 insertions(+), 96 deletions(-) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index 17dc12e..bdfe646 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -259,120 +259,120 @@ func (t *Transfer) ackReceiver() { } } } else { - isSlowWorker := false - isFastWorker := false - - if t.framesSent > 20 { - // Calculate expected throughput based on RTT - expectedThroughput := float64(0) - - if isLocalNetwork { - expectedThroughput = 500 * 1024 // 500KB/s as a reference point - } else if isRemoteNetwork { - expectedThroughput = 200 * 1024 // 200KB/s as a reference point - } else { - expectedThroughput = 300 * 1024 // 300KB/s as a reference point - } + isSlowWorker := false + isFastWorker := false - if t.currentThroughput < expectedThroughput*0.5 { - isSlowWorker = true - } else if t.currentThroughput > expectedThroughput*1.5 { - isFastWorker = true - } - } - - if isLocalNetwork { - if isSlowWorker { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 2) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 3) + if t.framesSent > 20 { + // Calculate expected throughput based on RTT + expectedThroughput := float64(0) + + if isLocalNetwork { + expectedThroughput = 500 * 1024 // 500KB/s as a reference point + } else if isRemoteNetwork { + expectedThroughput = 200 * 1024 // 200KB/s as a reference point } else { - newLimit = currentLimit + (currentLimit / 5) + expectedThroughput = 300 * 1024 // 300KB/s as a reference point } - } else if isFastWorker { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 3) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 5) - } else { - newLimit = currentLimit + (currentLimit / 8) - } - } else { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 4) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 6) - } else { - newLimit = currentLimit + (currentLimit / 10) + + if t.currentThroughput < expectedThroughput*0.5 { + isSlowWorker = true + } else if t.currentThroughput > expectedThroughput*1.5 { + isFastWorker = true } } - - if smoothedRTT > time.Duration(float64(minRTT)*2) { + + if isLocalNetwork { if isSlowWorker { - newLimit = currentLimit * 4 / 5 - } else { - newLimit = currentLimit * 3 / 4 - } - if newLimit < 1 { - newLimit = 1 - } - } - } else if isRemoteNetwork { - if isSlowWorker { - if rttVar < smoothedRTT/8 { - newLimit = currentLimit + (currentLimit / 8) - } else if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 12) - } else { - newLimit = currentLimit + (currentLimit / 20) - } - } else { - if rttVar < smoothedRTT/8 { - newLimit = currentLimit + (currentLimit / 10) - } else if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 16) + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 2) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 3) + } else { + newLimit = currentLimit + (currentLimit / 5) + } + } else if isFastWorker { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 3) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 5) + } else { + newLimit = currentLimit + (currentLimit / 8) + } } else { - newLimit = currentLimit + (currentLimit / 32) + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 4) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 6) + } else { + newLimit = currentLimit + (currentLimit / 10) + } } - } - if smoothedRTT > time.Duration(float64(minRTT)*2) { + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 4 / 5 + } else { + newLimit = currentLimit * 3 / 4 + } + if newLimit < 1 { + newLimit = 1 + } + } + } else if isRemoteNetwork { if isSlowWorker { - newLimit = currentLimit * 3 / 5 + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 8) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 12) + } else { + newLimit = currentLimit + (currentLimit / 20) + } } else { - newLimit = currentLimit / 2 + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 10) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 16) + } else { + newLimit = currentLimit + (currentLimit / 32) + } } - if newLimit < 1 { - newLimit = 1 + + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 3 / 5 + } else { + newLimit = currentLimit / 2 + } + if newLimit < 1 { + newLimit = 1 + } } - } - } else { - rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() - - if isSlowWorker { - rttMultiplier *= 0.8 - } else if isFastWorker { - rttMultiplier *= 1.2 - } - - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + int64(float64(currentLimit)/(6*rttMultiplier)) } else { - newLimit = currentLimit + int64(float64(currentLimit)/(12*rttMultiplier)) - } - - if smoothedRTT > time.Duration(float64(minRTT)*3) { - backoffFactor := 2.0 * rttMultiplier + rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() + if isSlowWorker { - backoffFactor *= 0.8 // Less aggressive backoff for slow workers + rttMultiplier *= 0.8 + } else if isFastWorker { + rttMultiplier *= 1.2 } - newLimit = int64(float64(currentLimit) / backoffFactor) - if newLimit < 1 { - newLimit = 1 + + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + int64(float64(currentLimit)/(6*rttMultiplier)) + } else { + newLimit = currentLimit + int64(float64(currentLimit)/(12*rttMultiplier)) + } + + if smoothedRTT > time.Duration(float64(minRTT)*3) { + backoffFactor := 2.0 * rttMultiplier + if isSlowWorker { + backoffFactor *= 0.8 // Less aggressive backoff for slow workers + } + newLimit = int64(float64(currentLimit) / backoffFactor) + if newLimit < 1 { + newLimit = 1 + } } } - } // Cap at maxBufferCount if newLimit > int64(t.maxBufferCount) { From 79ca2e446fa0b8d4e18bd36d07775ed1b9265eae Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:50:54 +0000 Subject: [PATCH 12/15] Fix indentation in transfer.go for dynamic RTT probing Co-Authored-By: fatedier --- pkg/sender/transfer.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index bdfe646..dcd602a 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -261,11 +261,11 @@ func (t *Transfer) ackReceiver() { } else { isSlowWorker := false isFastWorker := false - + if t.framesSent > 20 { // Calculate expected throughput based on RTT expectedThroughput := float64(0) - + if isLocalNetwork { expectedThroughput = 500 * 1024 // 500KB/s as a reference point } else if isRemoteNetwork { @@ -273,14 +273,14 @@ func (t *Transfer) ackReceiver() { } else { expectedThroughput = 300 * 1024 // 300KB/s as a reference point } - + if t.currentThroughput < expectedThroughput*0.5 { isSlowWorker = true } else if t.currentThroughput > expectedThroughput*1.5 { isFastWorker = true } } - + if isLocalNetwork { if isSlowWorker { if rttVar < smoothedRTT/4 { @@ -349,13 +349,13 @@ func (t *Transfer) ackReceiver() { } } else { rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() - + if isSlowWorker { rttMultiplier *= 0.8 } else if isFastWorker { rttMultiplier *= 1.2 } - + if rttVar < smoothedRTT/4 { newLimit = currentLimit + int64(float64(currentLimit)/(6*rttMultiplier)) } else { @@ -374,19 +374,20 @@ func (t *Transfer) ackReceiver() { } } - // Cap at maxBufferCount - if newLimit > int64(t.maxBufferCount) { - newLimit = int64(t.maxBufferCount) - t.inSlowStart = false + // Cap at maxBufferCount + if newLimit > int64(t.maxBufferCount) { + newLimit = int64(t.maxBufferCount) + t.inSlowStart = false + } + + t.congestionWindow = newLimit + t.limiter.SetLimit(newLimit) + t.lastCongestionAdj = now } - t.congestionWindow = newLimit - t.limiter.SetLimit(newLimit) - t.lastCongestionAdj = now + t.lastMetricTime = now + t.bytesTransferred = 0 } - - t.lastMetricTime = now - t.bytesTransferred = 0 } } From f19d493f30252becb0effa56055551366beb12fa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:51:41 +0000 Subject: [PATCH 13/15] Enhance adaptive frame sizing with improved RTT thresholds Co-Authored-By: fatedier --- pkg/sender/sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index fa62e3c..d90a8ea 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -271,7 +271,7 @@ func (sender *Sender) loopSend() { newSize = int(float64(currentSize) * 0.9) // Decrease by 10% } } else if isRemoteNetwork { - if rttVar < smoothedRTT/8 && smoothedRTT < minRTT+minRTT/4 { + if rttVar < smoothedRTT/8 && smoothedRTT < time.Duration(float64(minRTT)*1.25) { newSize = int(float64(currentSize) * 1.1) // Increase by 10% } else if rttVar > smoothedRTT/4 || smoothedRTT > time.Duration(float64(minRTT)*1.5) { newSize = int(float64(currentSize) * 0.75) // Decrease by 25% From bfa847a595b828d9657065eeb2b5a89abbf984c4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 14:59:24 +0000 Subject: [PATCH 14/15] Refactor transfer.go to reduce nesting with helper functions Co-Authored-By: fatedier --- pkg/sender/transfer.go | 394 +++++++++++++++++++++++------------------ 1 file changed, 220 insertions(+), 174 deletions(-) diff --git a/pkg/sender/transfer.go b/pkg/sender/transfer.go index dcd602a..08ab780 100644 --- a/pkg/sender/transfer.go +++ b/pkg/sender/transfer.go @@ -193,6 +193,224 @@ func (t *Transfer) frameSender() { } } +func (t *Transfer) adjustLimitInSlowStart(currentLimit int64, smoothedRTT, minRTT time.Duration, + rttMultiplier float64, isLocalNetwork, isRemoteNetwork bool) (newLimit int64, exitSlowStart bool) { + + newLimit = currentLimit + exitSlowStart = false + + if isLocalNetwork { + newLimit = currentLimit * 3 + if smoothedRTT > time.Duration(float64(minRTT)*1.5) && t.framesSent > 10 { + exitSlowStart = true + newLimit = currentLimit * 2 + } + } else if isRemoteNetwork { + newLimit = currentLimit * 2 + if smoothedRTT > time.Duration(float64(minRTT)*3) && t.framesSent > 30 { + exitSlowStart = true + newLimit = currentLimit + } + } else { + newLimit = currentLimit * 2 + multiplier := time.Duration(int64(2 * rttMultiplier)) + if smoothedRTT > minRTT*multiplier && t.framesSent > 20 { + exitSlowStart = true + newLimit = currentLimit + } + } + + return newLimit, exitSlowStart +} + +func (t *Transfer) classifyWorkerType(isLocalNetwork, isRemoteNetwork bool) (isSlowWorker, isFastWorker bool) { + if t.framesSent <= 20 { + return false, false + } + + // Calculate expected throughput based on network type + var expectedThroughput float64 + if isLocalNetwork { + expectedThroughput = 500 * 1024 // 500KB/s as a reference point + } else if isRemoteNetwork { + expectedThroughput = 200 * 1024 // 200KB/s as a reference point + } else { + expectedThroughput = 300 * 1024 // 300KB/s as a reference point + } + + isSlowWorker = t.currentThroughput < expectedThroughput*0.5 + isFastWorker = t.currentThroughput > expectedThroughput*1.5 + + return isSlowWorker, isFastWorker +} + +func (t *Transfer) adjustLimitForLocalNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + isSlowWorker bool) int64 { + + newLimit := currentLimit + + if isSlowWorker { + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 2) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 3) + } else { + newLimit = currentLimit + (currentLimit / 5) + } + } else { // Normal or fast worker + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 4) + } else if rttVar < smoothedRTT/2 { + newLimit = currentLimit + (currentLimit / 6) + } else { + newLimit = currentLimit + (currentLimit / 10) + } + } + + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 4 / 5 + } else { + newLimit = currentLimit * 3 / 4 + } + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) adjustLimitForRemoteNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + isSlowWorker bool) int64 { + + newLimit := currentLimit + + if isSlowWorker { + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 8) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 12) + } else { + newLimit = currentLimit + (currentLimit / 20) + } + } else { // Normal or fast worker + if rttVar < smoothedRTT/8 { + newLimit = currentLimit + (currentLimit / 10) + } else if rttVar < smoothedRTT/4 { + newLimit = currentLimit + (currentLimit / 16) + } else { + newLimit = currentLimit + (currentLimit / 32) + } + } + + if smoothedRTT > time.Duration(float64(minRTT)*2) { + if isSlowWorker { + newLimit = currentLimit * 3 / 5 + } else { + newLimit = currentLimit / 2 + } + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) adjustLimitForUnknownNetwork(currentLimit int64, rttVar, smoothedRTT, minRTT time.Duration, + rttMultiplier float64, isSlowWorker, isFastWorker bool) int64 { + + newLimit := currentLimit + + adjustedMultiplier := rttMultiplier + if isSlowWorker { + adjustedMultiplier *= 0.8 + } else if isFastWorker { + adjustedMultiplier *= 1.2 + } + + if rttVar < smoothedRTT/4 { + newLimit = currentLimit + int64(float64(currentLimit)/(6*adjustedMultiplier)) + } else { + newLimit = currentLimit + int64(float64(currentLimit)/(12*adjustedMultiplier)) + } + + if smoothedRTT > time.Duration(float64(minRTT)*3) { + backoffFactor := 2.0 * adjustedMultiplier + if isSlowWorker { + backoffFactor *= 0.8 // Less aggressive backoff for slow workers + } + newLimit = int64(float64(currentLimit) / backoffFactor) + if newLimit < 1 { + newLimit = 1 + } + } + + return newLimit +} + +func (t *Transfer) updateCongestionControl(now time.Time) { + if now.Sub(t.lastCongestionAdj) <= 100*time.Millisecond { + return + } + + smoothedRTT := t.rttStats.GetSmoothedRTT() + rttVar := t.rttStats.GetRTTVariation() + minRTT := t.rttStats.GetMinRTT() + rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() + isLocalNetwork := t.rttStats.IsLocalNetwork() + isRemoteNetwork := t.rttStats.IsRemoteNetwork() + + currentLimit := t.limiter.LimitNum() + var newLimit int64 = currentLimit + + if t.inSlowStart { + newLimit, t.inSlowStart = t.adjustLimitInSlowStart( + currentLimit, smoothedRTT, minRTT, rttMultiplier, isLocalNetwork, isRemoteNetwork) + } else { + isSlowWorker, isFastWorker := t.classifyWorkerType(isLocalNetwork, isRemoteNetwork) + + if isLocalNetwork { + newLimit = t.adjustLimitForLocalNetwork(currentLimit, rttVar, smoothedRTT, minRTT, isSlowWorker) + } else if isRemoteNetwork { + newLimit = t.adjustLimitForRemoteNetwork(currentLimit, rttVar, smoothedRTT, minRTT, isSlowWorker) + } else { + newLimit = t.adjustLimitForUnknownNetwork( + currentLimit, rttVar, smoothedRTT, minRTT, rttMultiplier, isSlowWorker, isFastWorker) + } + } + + // Cap at maxBufferCount + if newLimit > int64(t.maxBufferCount) { + newLimit = int64(t.maxBufferCount) + t.inSlowStart = false + } + + t.congestionWindow = newLimit + t.limiter.SetLimit(newLimit) + t.lastCongestionAdj = now +} + +func (t *Transfer) updateThroughputMetrics(now time.Time) { + if t.framesSent%10 != 0 && now.Sub(t.lastMetricTime) <= time.Second { + return + } + + elapsedSeconds := now.Sub(t.lastMetricTime).Seconds() + if elapsedSeconds <= 0 { + return + } + + // Calculate bytes per second + t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds + + t.updateCongestionControl(now) + + t.lastMetricTime = now + t.bytesTransferred = 0 +} + func (t *Transfer) ackReceiver() { defer t.recvShutdown.Done() @@ -215,185 +433,13 @@ func (t *Transfer) ackReceiver() { t.bytesTransferred += uint64(len(sf.Frame().Buf)) } - // Calculate throughput every 10 frames or at least once per second now := time.Now() - if t.framesSent%10 == 0 || now.Sub(t.lastMetricTime) > time.Second { - elapsedSeconds := now.Sub(t.lastMetricTime).Seconds() - if elapsedSeconds > 0 { - // Calculate bytes per second - t.currentThroughput = float64(t.bytesTransferred) / elapsedSeconds - - if now.Sub(t.lastCongestionAdj) > 100*time.Millisecond { - smoothedRTT := t.rttStats.GetSmoothedRTT() - rttVar := t.rttStats.GetRTTVariation() - minRTT := t.rttStats.GetMinRTT() - rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() - isLocalNetwork := t.rttStats.IsLocalNetwork() - isRemoteNetwork := t.rttStats.IsRemoteNetwork() - - currentLimit := t.limiter.LimitNum() - newLimit := currentLimit - - if t.inSlowStart { - if isLocalNetwork { - newLimit = currentLimit * 3 - - if smoothedRTT > time.Duration(float64(minRTT)*1.5) && t.framesSent > 10 { - t.inSlowStart = false - newLimit = currentLimit * 2 - } - } else if isRemoteNetwork { - newLimit = currentLimit * 2 - - if smoothedRTT > time.Duration(float64(minRTT)*3) && t.framesSent > 30 { - t.inSlowStart = false - newLimit = currentLimit - } - } else { - newLimit = currentLimit * 2 - - multiplier := time.Duration(int64(2 * rttMultiplier)) - if smoothedRTT > minRTT*multiplier && t.framesSent > 20 { - t.inSlowStart = false - newLimit = currentLimit - } - } - } else { - isSlowWorker := false - isFastWorker := false - - if t.framesSent > 20 { - // Calculate expected throughput based on RTT - expectedThroughput := float64(0) - - if isLocalNetwork { - expectedThroughput = 500 * 1024 // 500KB/s as a reference point - } else if isRemoteNetwork { - expectedThroughput = 200 * 1024 // 200KB/s as a reference point - } else { - expectedThroughput = 300 * 1024 // 300KB/s as a reference point - } - - if t.currentThroughput < expectedThroughput*0.5 { - isSlowWorker = true - } else if t.currentThroughput > expectedThroughput*1.5 { - isFastWorker = true - } - } - - if isLocalNetwork { - if isSlowWorker { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 2) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 3) - } else { - newLimit = currentLimit + (currentLimit / 5) - } - } else if isFastWorker { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 3) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 5) - } else { - newLimit = currentLimit + (currentLimit / 8) - } - } else { - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 4) - } else if rttVar < smoothedRTT/2 { - newLimit = currentLimit + (currentLimit / 6) - } else { - newLimit = currentLimit + (currentLimit / 10) - } - } - - if smoothedRTT > time.Duration(float64(minRTT)*2) { - if isSlowWorker { - newLimit = currentLimit * 4 / 5 - } else { - newLimit = currentLimit * 3 / 4 - } - if newLimit < 1 { - newLimit = 1 - } - } - } else if isRemoteNetwork { - if isSlowWorker { - if rttVar < smoothedRTT/8 { - newLimit = currentLimit + (currentLimit / 8) - } else if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 12) - } else { - newLimit = currentLimit + (currentLimit / 20) - } - } else { - if rttVar < smoothedRTT/8 { - newLimit = currentLimit + (currentLimit / 10) - } else if rttVar < smoothedRTT/4 { - newLimit = currentLimit + (currentLimit / 16) - } else { - newLimit = currentLimit + (currentLimit / 32) - } - } - - if smoothedRTT > time.Duration(float64(minRTT)*2) { - if isSlowWorker { - newLimit = currentLimit * 3 / 5 - } else { - newLimit = currentLimit / 2 - } - if newLimit < 1 { - newLimit = 1 - } - } - } else { - rttMultiplier := t.rttStats.GetAdaptiveRTTMultiplier() - - if isSlowWorker { - rttMultiplier *= 0.8 - } else if isFastWorker { - rttMultiplier *= 1.2 - } - - if rttVar < smoothedRTT/4 { - newLimit = currentLimit + int64(float64(currentLimit)/(6*rttMultiplier)) - } else { - newLimit = currentLimit + int64(float64(currentLimit)/(12*rttMultiplier)) - } - - if smoothedRTT > time.Duration(float64(minRTT)*3) { - backoffFactor := 2.0 * rttMultiplier - if isSlowWorker { - backoffFactor *= 0.8 // Less aggressive backoff for slow workers - } - newLimit = int64(float64(currentLimit) / backoffFactor) - if newLimit < 1 { - newLimit = 1 - } - } - } - - // Cap at maxBufferCount - if newLimit > int64(t.maxBufferCount) { - newLimit = int64(t.maxBufferCount) - t.inSlowStart = false - } - - t.congestionWindow = newLimit - t.limiter.SetLimit(newLimit) - t.lastCongestionAdj = now - } - - t.lastMetricTime = now - t.bytesTransferred = 0 - } - } - } + t.updateThroughputMetrics(now) delete(t.waitAcks, ack.FrameID) } t.mu.Unlock() + if ok { t.limiter.Release() } From 7ac13abf61188610f09a9c91d9898d714560648b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 15:00:20 +0000 Subject: [PATCH 15/15] Fix formatting in rtt.go with go fmt Co-Authored-By: fatedier --- pkg/sender/rtt.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/sender/rtt.go b/pkg/sender/rtt.go index 72ea4c1..97c5cff 100644 --- a/pkg/sender/rtt.go +++ b/pkg/sender/rtt.go @@ -161,7 +161,7 @@ func (r *RTTStats) IsRemoteNetwork() bool { func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { r.mu.Lock() defer r.mu.Unlock() - + // Base multiplier based on network environment var baseMultiplier float64 switch r.networkEnvironment { @@ -172,24 +172,24 @@ func (r *RTTStats) GetAdaptiveRTTMultiplier() float64 { default: baseMultiplier = 1.0 // Default multiplier } - + if r.samples < 10 { return baseMultiplier // Not enough samples to make adjustments } - + stabilityRatio := float64(r.rttVar) / float64(r.smoothedRTT) - + if stabilityRatio < 0.1 { return baseMultiplier * 0.8 } else if stabilityRatio > 0.3 { return baseMultiplier * 1.5 } - + if r.latestRTT < r.smoothedRTT { return baseMultiplier * 0.9 } else if r.latestRTT > time.Duration(float64(r.smoothedRTT)*1.2) { return baseMultiplier * 1.3 } - + return baseMultiplier }