From e7023e5fe21b45e02489232dfd0039dc8ca50a0d Mon Sep 17 00:00:00 2001 From: Abolfazl Date: Tue, 31 Mar 2026 19:54:45 +0330 Subject: [PATCH 1/2] Add backpressure handling for txChannel Implemented backpressure handling for txChannel to prevent packet loss. --- internal/client/dispatcher.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/client/dispatcher.go b/internal/client/dispatcher.go index 9fd39da6..c1fe3cb7 100644 --- a/internal/client/dispatcher.go +++ b/internal/client/dispatcher.go @@ -397,16 +397,16 @@ dispatchLoop: select { case c.txChannel <- pkt: - // if !isLogged && pkt.packetType != Enums.PACKET_PING { - // packedSummary := "" - // if opts.PacketType == Enums.PACKET_PACKED_CONTROL_BLOCKS { - // packedSummary = " | " + VpnProto.DescribePackedControlBlocks(opts.Payload, 4) - // } - // c.logOutboundPacket(opts.PacketType, opts.SessionID, len(opts.Payload), opts.StreamID, opts.SequenceNum, opts.FragmentID, opts.TotalFragments, packedSummary) - // } - // isLogged = true default: - c.log.Warnf("TX channel filled before enqueue completed | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) + // Brief backpressure: wait up to one idle-poll interval before dropping. + // This avoids silent packet loss that forces expensive ARQ retransmits. + select { + case c.txChannel <- pkt: + case <-time.After(idlePoll): + c.log.Warnf("TX channel full, packet dropped | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) + case <-ctx.Done(): + return + } } } From 81236d231239913e04b374d917003ed04fa8b712 Mon Sep 17 00:00:00 2001 From: Abolfazl Date: Tue, 31 Mar 2026 22:19:21 +0330 Subject: [PATCH 2/2] Enhance dispatch loop with context and idle timer handling Refactor dispatch loop to handle context cancellation and idle timer more effectively, improving packet handling and backpressure management. --- internal/client/dispatcher.go | 46 ++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/internal/client/dispatcher.go b/internal/client/dispatcher.go index c1fe3cb7..ca91ec21 100644 --- a/internal/client/dispatcher.go +++ b/internal/client/dispatcher.go @@ -197,10 +197,25 @@ dispatchLoop: } if !c.txChannelHasCapacity(len(conns)) { - if !waitForWork() { + select { + case <-ctx.Done(): return + case <-c.txSpaceSignal: + if !c.txChannelHasCapacity(len(conns)) { + continue dispatchLoop + } + case <-idleTimer.C: + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(idlePoll) + if !c.txChannelHasCapacity(len(conns)) { + continue dispatchLoop + } } - continue dispatchLoop } var item *clientStreamTXPacket @@ -375,7 +390,8 @@ dispatchLoop: } packetByDomain := make(map[string][]byte, len(conns)) - // var isLogged bool = false + pendingPkts := make([]asyncPacket, 0, len(conns)) + for _, conn := range conns { domain := conn.Domain if domain == "" { @@ -394,19 +410,25 @@ dispatchLoop: pkt := finalPacket pkt.conn = conn pkt.payload = dnsPacket + pendingPkts = append(pendingPkts, pkt) + } + if !c.txChannelHasCapacity(len(pendingPkts)) { + select { + case c.txSignal <- struct{}{}: + default: + } + if !wasPacked && selected != nil { + selected.ReleaseTXPacket(item) + } + continue dispatchLoop + } + + for _, pkt := range pendingPkts { select { case c.txChannel <- pkt: default: - // Brief backpressure: wait up to one idle-poll interval before dropping. - // This avoids silent packet loss that forces expensive ARQ retransmits. - select { - case c.txChannel <- pkt: - case <-time.After(idlePoll): - c.log.Warnf("TX channel full, packet dropped | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) - case <-ctx.Done(): - return - } + c.log.Warnf("TX channel race: packet dropped | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) } }