diff --git a/internal/client/dispatcher.go b/internal/client/dispatcher.go index 9fd39da6..ca91ec21 100644 --- a/internal/client/dispatcher.go +++ b/internal/client/dispatcher.go @@ -197,10 +197,25 @@ dispatchLoop: } if !c.txChannelHasCapacity(len(conns)) { - if !waitForWork() { + select { + case <-ctx.Done(): return + case <-c.txSpaceSignal: + if !c.txChannelHasCapacity(len(conns)) { + continue dispatchLoop + } + case <-idleTimer.C: + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(idlePoll) + if !c.txChannelHasCapacity(len(conns)) { + continue dispatchLoop + } } - continue dispatchLoop } var item *clientStreamTXPacket @@ -375,7 +390,8 @@ dispatchLoop: } packetByDomain := make(map[string][]byte, len(conns)) - // var isLogged bool = false + pendingPkts := make([]asyncPacket, 0, len(conns)) + for _, conn := range conns { domain := conn.Domain if domain == "" { @@ -394,19 +410,25 @@ dispatchLoop: pkt := finalPacket pkt.conn = conn pkt.payload = dnsPacket + pendingPkts = append(pendingPkts, pkt) + } + + if !c.txChannelHasCapacity(len(pendingPkts)) { + select { + case c.txSignal <- struct{}{}: + default: + } + if !wasPacked && selected != nil { + selected.ReleaseTXPacket(item) + } + continue dispatchLoop + } + for _, pkt := range pendingPkts { select { case c.txChannel <- pkt: - // if !isLogged && pkt.packetType != Enums.PACKET_PING { - // packedSummary := "" - // if opts.PacketType == Enums.PACKET_PACKED_CONTROL_BLOCKS { - // packedSummary = " | " + VpnProto.DescribePackedControlBlocks(opts.Payload, 4) - // } - // c.logOutboundPacket(opts.PacketType, opts.SessionID, len(opts.Payload), opts.StreamID, opts.SequenceNum, opts.FragmentID, opts.TotalFragments, packedSummary) - // } - // isLogged = true default: - c.log.Warnf("TX channel filled before enqueue completed | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) + c.log.Warnf("TX channel race: packet dropped | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID) } }