diff --git a/README.md b/README.md index 7259542..63791e3 100644 --- a/README.md +++ b/README.md @@ -139,8 +139,13 @@ sudo ip6tables -t nat -I PREROUTING -i eth0 -p udp --dport 53 -j REDIRECT --to-p | `-queue-size N` | Packet queue size for transport and DNS layers | `512` | | `-kcp-window-size N` | KCP send/receive window size in packets (0 = queue-size/2) | `0` | | `-queue-overflow MODE` | Queue overflow behavior: `drop` (silent discard) or `block` (backpressure) | `drop` | +| `-response-queue-size N` | Pending DNS response queue size (0 = `queue-size`) | `0` | +| `-response-workers N` | Number of DNS response sender workers | `2` | +| `-response-delay D` | Max time to hold a DNS response open while waiting for downstream data | `200ms` | | `-log-level LEVEL` | Log level: debug, info, warning, error | `info` | +> **Note:** The server response queue is drop-based. When it fills, pending DNS responses are dropped and counted in the debug stats log as `response_dropped`. + ### Client flags #### Transport (pick one) @@ -168,8 +173,11 @@ sudo ip6tables -t nat -I PREROUTING -i eth0 -p udp --dport 53 -j REDIRECT --to-p | `-keepalive D` | Keepalive ping interval (must match server, must be < idle-timeout) | `2s` | | `-max-streams N` | Max concurrent streams per session (0 = unlimited) | `0` | | `-open-stream-timeout D` | Timeout for opening an smux stream | `10s` | +| `-open-stream-failure-limit N` | Retire an idle session after this many consecutive stream-open failures | `3` | | `-reconnect-min D` | Initial backoff delay for session reconnect | `1s` | | `-reconnect-max D` | Max backoff delay (must be >= reconnect-min) | `30s` | +| `-session-check-interval D` | How often the managed client checks session and transport health | `500ms` | +| `-udp-transport-stale-timeout D` | Retire the current session if per-query UDP sees no valid response for this long while streams need transport | `3s` | > **Note:** `idle-timeout` and `keepalive` must be set to the same values on both client and server — mismatched values will cause one side to close the session before the other detects it. Keep `keepalive` well below `idle-timeout` (the default 5x ratio allows ~5 ping attempts before timeout). > @@ -185,6 +193,9 @@ These flags only apply when using `-udp`. By default, each query is sent from a | `-udp-timeout D` | Per-query response timeout — the total time a worker waits for a valid (NOERROR) response. Forged responses are discarded but the deadline is not extended — if no valid response arrives within this window, the query is abandoned. | `500ms` | | `-udp-shared-socket` | Use a single shared UDP socket instead of per-query sockets. By default, each query is sent from a new socket with a random ephemeral source port, making the tunnel harder to fingerprint or block by port. With this flag, all queries share one socket and source port for the lifetime of the client — blocking that port kills the tunnel. | `false` | | `-udp-accept-errors` | In per-query mode, accept the first DNS response regardless of RCODE instead of waiting for a NOERROR response. This disables forged response filtering — the worker stops waiting after the first forged response, so the real response is likely lost. Only useful for debugging; not recommended in production. Ignored when `-udp-shared-socket` is set. | `false` | +| `-poll-delay D` | Base delay before sending an empty DNS poll when idle | `500ms` | +| `-active-poll-delay D` | Poll delay cap while streams are active or being opened | `200ms` | +| `-poll-max-delay D` | Max idle backoff between empty DNS polls | `2s` | #### Queue and KCP tuning diff --git a/client/client.go b/client/client.go index 6bc6acc..3b8d4b7 100644 --- a/client/client.go +++ b/client/client.go @@ -27,6 +27,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "syscall" "time" @@ -41,16 +42,21 @@ import ( // Default timeouts for VayDNS mode. const ( - DefaultIdleTimeout = 10 * time.Second - DefaultKeepAlive = 2 * time.Second - DefaultOpenStreamTimeout = 10 * time.Second - DefaultReconnectDelay = 1 * time.Second - DefaultReconnectMaxDelay = 30 * time.Second - DefaultSessionCheckInterval = 500 * time.Millisecond - DefaultUDPResponseTimeout = 500 * time.Millisecond - DefaultUDPWorkers = 100 - DefaultMaxStreams = 0 // unlimited - DefaultHandshakeTimeout = 15 * time.Second + DefaultIdleTimeout = 10 * time.Second + DefaultKeepAlive = 2 * time.Second + DefaultOpenStreamTimeout = 10 * time.Second + DefaultReconnectDelay = 1 * time.Second + DefaultReconnectMaxDelay = 30 * time.Second + DefaultSessionCheckInterval = 500 * time.Millisecond + DefaultUDPResponseTimeout = 500 * time.Millisecond + DefaultUDPWorkers = 100 + DefaultMaxStreams = 0 // unlimited + DefaultHandshakeTimeout = 15 * time.Second + DefaultPollDelay = 500 * time.Millisecond + DefaultActivePollDelay = 200 * time.Millisecond + DefaultPollMaxDelay = 2 * time.Second + DefaultUDPTransportStaleTimeout = 3 * time.Second + DefaultOpenStreamFailureLimit = 3 ) // Default timeouts for dnstt compatibility mode. @@ -194,17 +200,22 @@ type Tunnel struct { TunnelServer TunnelServer // Session configuration. Zero values use defaults. - IdleTimeout time.Duration // default: 10s (2m with DnsttCompat) - KeepAlive time.Duration // default: 2s (10s with DnsttCompat) - OpenStreamTimeout time.Duration // default: 10s - MaxStreams int // default: 0 (0 = unlimited) - ReconnectMinDelay time.Duration // default: 1s - ReconnectMaxDelay time.Duration // default: 30s - SessionCheckInterval time.Duration // default: 500ms - HandshakeTimeout time.Duration // default: 15s - PacketQueueSize int // default: QueueSize (512) - KCPWindowSize int // default: PacketQueueSize/2 - QueueOverflowMode turbotunnel.QueueOverflowMode // default: drop + IdleTimeout time.Duration // default: 10s (2m with DnsttCompat) + KeepAlive time.Duration // default: 2s (10s with DnsttCompat) + OpenStreamTimeout time.Duration // default: 10s + MaxStreams int // default: 0 (0 = unlimited) + ReconnectMinDelay time.Duration // default: 1s + ReconnectMaxDelay time.Duration // default: 30s + SessionCheckInterval time.Duration // default: 500ms + HandshakeTimeout time.Duration // default: 15s + PacketQueueSize int // default: QueueSize (512) + KCPWindowSize int // default: PacketQueueSize/2 + QueueOverflowMode turbotunnel.QueueOverflowMode // default: drop + PollDelay time.Duration // default: 500ms + ActivePollDelay time.Duration // default: 200ms + PollMaxDelay time.Duration // default: 2s + UDPTransportStaleTimeout time.Duration // default: 3s (UDP per-query only) + OpenStreamFailureLimit int // default: 3 consecutive idle failures // internal state wireConfig turbotunnel.WireConfig @@ -215,6 +226,7 @@ type Tunnel struct { noiseChannel io.ReadWriteCloser smuxSession *smux.Session remoteAddr net.Addr + activeStreams atomic.Int32 } // NewTunnel creates a Tunnel with the given resolver and server configuration. @@ -263,6 +275,21 @@ func (t *Tunnel) applyDefaults() { if t.HandshakeTimeout == 0 { t.HandshakeTimeout = DefaultHandshakeTimeout } + if t.PollDelay == 0 { + t.PollDelay = DefaultPollDelay + } + if t.ActivePollDelay == 0 { + t.ActivePollDelay = DefaultActivePollDelay + } + if t.PollMaxDelay == 0 { + t.PollMaxDelay = DefaultPollMaxDelay + } + if t.UDPTransportStaleTimeout == 0 { + t.UDPTransportStaleTimeout = DefaultUDPTransportStaleTimeout + } + if t.OpenStreamFailureLimit == 0 { + t.OpenStreamFailureLimit = DefaultOpenStreamFailureLimit + } } func (t *Tunnel) effectivePacketQueueSize() int { @@ -370,13 +397,30 @@ func (t *Tunnel) InitiateResolverConnection() error { // InitiateDNSPacketConn wraps the resolver connection with DNS encoding. func (t *Tunnel) InitiateDNSPacketConn(domain dns.Name) error { + t.applyDefaults() var rateLimiter *RateLimiter if t.TunnelServer.RPS > 0 { rateLimiter = NewRateLimiter(t.TunnelServer.RPS) } maxQnameLen := t.TunnelServer.effectiveMaxQnameLen() rrType := t.TunnelServer.effectiveRRType() - t.dnsPacketConn = NewDNSPacketConn(t.resolverConn, t.remoteAddr, domain, rateLimiter, maxQnameLen, t.TunnelServer.MaxNumLabels, t.wireConfig, t.forgedStats, rrType, t.effectivePacketQueueSize(), t.effectiveQueueOverflowMode()) + t.dnsPacketConn = newDNSPacketConn( + t.resolverConn, + t.remoteAddr, + domain, + rateLimiter, + maxQnameLen, + t.TunnelServer.MaxNumLabels, + t.wireConfig, + t.forgedStats, + rrType, + &t.activeStreams, + t.PollDelay, + t.ActivePollDelay, + t.PollMaxDelay, + t.effectivePacketQueueSize(), + t.effectiveQueueOverflowMode(), + ) return nil } @@ -525,6 +569,12 @@ func (t *Tunnel) OpenStream() (net.Conn, error) { // Handle forwards data between a local TCP connection and a tunnel stream. func (t *Tunnel) Handle(lconn *net.TCPConn) error { + if err := lconn.SetNoDelay(true); err != nil { + log.Debugf("local TCP_NODELAY: %v", err) + } + t.activeStreams.Add(1) + defer t.activeStreams.Add(-1) + stream, err := t.OpenStream() if err != nil { return err @@ -603,6 +653,24 @@ func (t *Tunnel) resetTransportLayers() error { return nil } +func (t *Tunnel) udpTransportStaleAge(requireTraffic bool) time.Duration { + if !requireTraffic || t.UDPTransportStaleTimeout <= 0 { + return 0 + } + if t.Resolver.ResolverType != ResolverTypeUDP || t.Resolver.UDPSharedSocket { + return 0 + } + udpConn, ok := t.resolverConn.(*UDPPacketConn) + if !ok { + return 0 + } + lastSuccess := udpConn.lastSuccessTime() + if lastSuccess.IsZero() { + return 0 + } + return time.Since(lastSuccess) +} + // ListenAndServe starts a TCP listener and forwards connections through the // tunnel with automatic session reconnection. This is the main entry point // for the CLI. @@ -659,9 +727,24 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error { for { conn, sess, err = t.createSession(mtu) if err == nil { + log.Infof("session %08x ready", conn.GetConv()) + break + } + log.Warnf("session creation failed: %v; rebuilding transport and retrying in %v", err, delay) + t.closeTransportLayers() + for { + if err := t.resetTransportLayers(); err != nil { + log.Warnf("transport rebuild failed: %v; retrying in %v", err, delay) + time.Sleep(delay) + delay *= 2 + if delay > t.ReconnectMaxDelay { + delay = t.ReconnectMaxDelay + } + continue + } + transportErrCh = t.dnsPacketConn.TransportErrors() break } - log.Warnf("session creation failed: %v; retrying in %v", err, delay) time.Sleep(delay) delay *= 2 if delay > t.ReconnectMaxDelay { @@ -671,6 +754,7 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error { sessDone := sess.CloseChan() conv := conn.GetConv() + var openFailCount atomic.Int32 sessionAlive := true for sessionAlive { @@ -678,6 +762,11 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error { local, err := ln.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { + if age := t.udpTransportStaleAge(t.activeStreams.Load() > 0); age > t.UDPTransportStaleTimeout { + log.Warnf("session %08x transport stale after %v with %d active streams", conv, age, t.activeStreams.Load()) + sessionAlive = false + continue + } select { case <-sessDone: sessionAlive = false @@ -706,18 +795,31 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error { continue default: } + if age := t.udpTransportStaleAge(t.activeStreams.Load() > 0 || openFailCount.Load() > 0); age > t.UDPTransportStaleTimeout { + log.Warnf("session %08x transport stale after %v while streams need transport", conv, age) + local.Close() + sessionAlive = false + continue + } - go func(sess *smux.Session, conv uint32) { + go func(local *net.TCPConn, sess *smux.Session, conv uint32, openFailCount *atomic.Int32) { if sem != nil { sem <- struct{}{} defer func() { <-sem }() } defer local.Close() - err := t.handleConn(local.(*net.TCPConn), sess, conv) + err := t.handleConn(local, sess, conv, openFailCount) if err != nil { log.Warnf("handle: %v", err) + if t.activeStreams.Load() == 0 { + failures := openFailCount.Add(1) + if failures >= int32(t.OpenStreamFailureLimit) && !sess.IsClosed() { + log.Warnf("session %08x retiring idle session after %d consecutive stream-open failures", conv, failures) + sess.Close() + } + } } - }(sess, conv) + }(local.(*net.TCPConn), sess, conv, &openFailCount) } log.Warnf("session %08x closed, reconnecting", conv) @@ -733,7 +835,6 @@ func (t *Tunnel) createSession(mtu int) (*kcp.UDPSession, *smux.Session, error) if err != nil { return nil, nil, fmt.Errorf("opening KCP conn: %v", err) } - log.Infof("session %08x ready", conn.GetConv()) conn.SetStreamMode(true) conn.SetNoDelay(0, 0, 0, 1) conn.SetWindowSize(t.effectiveKCPWindowSize(), t.effectiveKCPWindowSize()) @@ -763,11 +864,20 @@ func (t *Tunnel) createSession(mtu int) (*kcp.UDPSession, *smux.Session, error) } // handleConn forwards a single TCP connection through the tunnel session. -func (t *Tunnel) handleConn(local *net.TCPConn, sess *smux.Session, conv uint32) error { +func (t *Tunnel) handleConn(local *net.TCPConn, sess *smux.Session, conv uint32, openFailCount *atomic.Int32) error { + if err := local.SetNoDelay(true); err != nil { + log.Debugf("stream %08x local TCP_NODELAY: %v", conv, err) + } + t.activeStreams.Add(1) + defer t.activeStreams.Add(-1) + stream, err := openStreamWithTimeout(conv, t.OpenStreamTimeout, sess.OpenStream) if err != nil { return err } + if openFailCount != nil { + openFailCount.Store(0) + } defer func() { log.Debugf("stream %08x:%d closed", conv, stream.ID()) diff --git a/client/dns.go b/client/dns.go index 0453b78..53989e2 100644 --- a/client/dns.go +++ b/client/dns.go @@ -27,13 +27,10 @@ const ( // sendLoop has a poll timer that automatically sends an empty polling // query when a certain amount of time has elapsed without a send. The - // poll timer is initially set to initPollDelay. It increases by a - // factor of pollDelayMultiplier every time the poll timer expires, up - // to a maximum of maxPollDelay. The poll timer is reset to - // initPollDelay whenever an a send occurs that is not the result of the - // poll timer expiring. - initPollDelay = 500 * time.Millisecond - maxPollDelay = 10 * time.Second + // poll timer starts at pollDelay, increases by pollDelayMultiplier on + // idle expirations, and is capped at pollMaxDelay. When there are + // active tunnel streams, we instead use activePollDelay both as the + // reset value and as the cap so downstream data keeps flowing quickly. pollDelayMultiplier = 2.0 // A limit on the number of empty poll requests we may send in a burst @@ -65,9 +62,14 @@ func NewRateLimiter(rps float64) *RateLimiter { if rps <= 0 || math.IsNaN(rps) || math.IsInf(rps, 0) { return nil } + capacity := rps + if capacity < 1.0 { + // A fractional rate still needs to be able to accumulate one whole token. + capacity = 1.0 + } return &RateLimiter{ - tokens: rps, - capacity: rps, + tokens: capacity, + capacity: capacity, rate: rps, lastTime: time.Now(), } @@ -180,6 +182,13 @@ type DNSPacketConn struct { maxNumLabels int // Forged response tracking (shared with UDPPacketConn in per-query mode) forgedStats *ForgedStats + // activeStreams is incremented while stream establishment or stream I/O is + // in progress. When positive, sendLoop keeps polling aggressively instead + // of backing off to the idle maximum. + activeStreams *atomic.Int32 + pollDelay time.Duration + activePollDelay time.Duration + pollMaxDelay time.Duration // Transport error reporting for session health monitoring transportErr chan error // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls. @@ -197,9 +206,27 @@ type DNSPacketConn struct { // forgedStats is shared with the transport layer (e.g. UDPPacketConn) for // consistent forged response tracking; if nil, a new instance is created. func NewDNSPacketConn(transport net.PacketConn, addr net.Addr, domain dns.Name, rateLimiter *RateLimiter, maxQnameLen int, maxNumLabels int, wireConfig turbotunnel.WireConfig, forgedStats *ForgedStats, rrType uint16, queueSize int, overflowMode turbotunnel.QueueOverflowMode) *DNSPacketConn { + return newDNSPacketConn(transport, addr, domain, rateLimiter, maxQnameLen, maxNumLabels, wireConfig, forgedStats, rrType, nil, DefaultPollDelay, DefaultActivePollDelay, DefaultPollMaxDelay, queueSize, overflowMode) +} + +// newDNSPacketConn is the internal constructor that can optionally receive a +// pointer to the active stream counter for stream-aware polling. +func newDNSPacketConn(transport net.PacketConn, addr net.Addr, domain dns.Name, rateLimiter *RateLimiter, maxQnameLen int, maxNumLabels int, wireConfig turbotunnel.WireConfig, forgedStats *ForgedStats, rrType uint16, activeStreams *atomic.Int32, pollDelay time.Duration, activePollDelay time.Duration, pollMaxDelay time.Duration, queueSize int, overflowMode turbotunnel.QueueOverflowMode) *DNSPacketConn { if maxQnameLen <= 0 || maxQnameLen > 253 { maxQnameLen = 253 } + if pollDelay <= 0 { + pollDelay = DefaultPollDelay + } + if activePollDelay <= 0 { + activePollDelay = DefaultActivePollDelay + } + if pollMaxDelay <= 0 { + pollMaxDelay = DefaultPollMaxDelay + } + if pollMaxDelay < pollDelay { + pollMaxDelay = pollDelay + } if forgedStats == nil { forgedStats = &ForgedStats{} } @@ -218,6 +245,10 @@ func NewDNSPacketConn(transport net.PacketConn, addr net.Addr, domain dns.Name, maxQnameLen: maxQnameLen, maxNumLabels: maxNumLabels, forgedStats: forgedStats, + activeStreams: activeStreams, + pollDelay: pollDelay, + activePollDelay: activePollDelay, + pollMaxDelay: pollMaxDelay, transportErr: make(chan error, 2), QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0, queueSize, overflowMode), } @@ -260,6 +291,20 @@ func (c *DNSPacketConn) TransportErrors() <-chan error { return c.transportErr } +func (c *DNSPacketConn) currentPollDelay() time.Duration { + if c.activeStreams != nil && c.activeStreams.Load() > 0 { + return c.activePollDelay + } + return c.pollDelay +} + +func (c *DNSPacketConn) currentPollMaxDelay() time.Duration { + if c.activeStreams != nil && c.activeStreams.Load() > 0 { + return c.activePollDelay + } + return c.pollMaxDelay +} + // dnsResponsePayload extracts the downstream payload of a DNS response. It // returns (nil, true) when the response has a non-NoError RCODE, indicating a // forged or hijacked response. It returns (payload, false) on success or @@ -559,7 +604,7 @@ func (c *DNSPacketConn) send(transport net.PacketConn, p []byte, addr net.Addr) // on the network using send. It also does polling with empty packets when // requested by pollChan or after a timeout. func (c *DNSPacketConn) sendLoop(transport net.PacketConn, addr net.Addr) error { - pollDelay := initPollDelay + pollDelay := c.currentPollDelay() pollTimer := time.NewTimer(pollDelay) defer pollTimer.Stop() outgoing := c.QueuePacketConn.OutgoingQueue(addr) @@ -597,8 +642,9 @@ func (c *DNSPacketConn) sendLoop(transport net.PacketConn, addr net.Addr) error // We're polling because it's been a while since we last // polled. Increase the poll delay. pollDelay = time.Duration(float64(pollDelay) * pollDelayMultiplier) - if pollDelay > maxPollDelay { - pollDelay = maxPollDelay + currentMaxPollDelay := c.currentPollMaxDelay() + if pollDelay > currentMaxPollDelay { + pollDelay = currentMaxPollDelay } } else { // We're sending an actual data packet, or we're polling @@ -607,7 +653,7 @@ func (c *DNSPacketConn) sendLoop(transport net.PacketConn, addr net.Addr) error if !pollTimer.Stop() { <-pollTimer.C } - pollDelay = initPollDelay + pollDelay = c.currentPollDelay() } pollTimer.Reset(pollDelay) diff --git a/client/udp.go b/client/udp.go index 8eae1b5..7bcf17f 100644 --- a/client/udp.go +++ b/client/udp.go @@ -3,6 +3,7 @@ package client import ( "context" "net" + "sync/atomic" "syscall" "time" @@ -29,6 +30,7 @@ type UDPPacketConn struct { responseTimeout time.Duration ignoreErrors bool forgedStats *ForgedStats + lastSuccess atomic.Int64 *turbotunnel.QueuePacketConn } @@ -46,12 +48,25 @@ func NewUDPPacketConn(remoteAddr net.Addr, dialerControl func(network, address s forgedStats: stats, QueuePacketConn: turbotunnel.NewQueuePacketConn(remoteAddr, 0, queueSize, overflowMode), } + pconn.markSuccess() for i := 0; i < numWorkers; i++ { go pconn.sendLoop() } return pconn, stats, nil } +func (c *UDPPacketConn) markSuccess() { + c.lastSuccess.Store(time.Now().UnixNano()) +} + +func (c *UDPPacketConn) lastSuccessTime() time.Time { + ns := c.lastSuccess.Load() + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + // sendLoop is the per-worker loop. It dequeues one packet at a time from the // outgoing queue, sends it on a fresh UDP socket, reads the response, and // queues valid responses for the upper layer. On consecutive send/recv @@ -141,6 +156,7 @@ func (c *UDPPacketConn) sendRecv(p []byte) error { } // Queue the raw wire-format response for the upper layer (dns.go recvLoop). + c.markSuccess() c.QueueIncoming(buf[:n], c.remoteAddr) return nil } diff --git a/docs/client-library.md b/docs/client-library.md index 391b6b0..1aec52f 100644 --- a/docs/client-library.md +++ b/docs/client-library.md @@ -95,6 +95,11 @@ t.SessionCheckInterval = 500 * time.Millisecond t.ReconnectMinDelay = 1 * time.Second t.ReconnectMaxDelay = 30 * time.Second t.HandshakeTimeout = 15 * time.Second +t.PollDelay = 500 * time.Millisecond +t.ActivePollDelay = 200 * time.Millisecond +t.PollMaxDelay = 2 * time.Second +t.UDPTransportStaleTimeout = 3 * time.Second +t.OpenStreamFailureLimit = 3 // Transport queue options t.PacketQueueSize = 512 // queue capacity diff --git a/man/vaydns-client.1 b/man/vaydns-client.1 index 0b92691..e257c6c 100644 --- a/man/vaydns-client.1 +++ b/man/vaydns-client.1 @@ -164,13 +164,26 @@ with Maximum concurrent streams per session. 0 means unlimited. Default: -.Cm 256 . +.Cm 0 . .It Fl open-stream-timeout Ar DURATION Timeout for opening an smux stream. Default: .Cm 10s . +.It Fl session-check-interval Ar DURATION +How often the managed client checks whether the current session and +transport are still healthy. +Default: +.Cm 500ms . + +.It Fl open-stream-failure-limit Ar N +Retire an idle session after +.Ar N +consecutive stream-open failures. +Default: +.Cm 3 . + .It Fl reconnect-min Ar DURATION Initial backoff delay before retrying session creation. Default: @@ -226,7 +239,32 @@ Only applies when using without .Fl udp-shared-socket . Default: -.Cm 400ms . +.Cm 500ms . + +.It Fl poll-delay Ar DURATION +Base delay before sending an empty DNS poll when idle. +Default: +.Cm 500ms . + +.It Fl active-poll-delay Ar DURATION +Polling delay cap while streams are active or being opened. +Default: +.Cm 200ms . + +.It Fl poll-max-delay Ar DURATION +Maximum idle backoff between empty DNS polls. +Default: +.Cm 2s . + +.It Fl udp-transport-stale-timeout Ar DURATION +Retire the current session if per-query UDP sees no valid response for +this long while streams need transport. +Only applies when using +.Fl udp +without +.Fl udp-shared-socket . +Default: +.Cm 3s . .It Fl udp-shared-socket Use a single shared UDP socket instead of per-query sockets. @@ -290,10 +328,30 @@ Ignored when .Fl dnstt-compat is set. +.It Fl queue-size Ar N +Packet queue size for the transport and DNS layers. +Default: +.Cm 512 . + +.It Fl kcp-window-size Ar N +KCP send and receive window size in packets. +0 means half of +.Fl queue-size . +Default: +.Cm 0 . + +.It Fl queue-overflow Ar MODE +Queue overflow behavior: +.Cm drop +or +.Cm block . +Default: +.Cm drop . + .It Fl log-level Ar LEVEL Log level: debug, info, warning, error. Default: -.Cm warning . +.Cm info . .El diff --git a/man/vaydns-server.1 b/man/vaydns-server.1 index 29ba74a..dfad8c4 100644 --- a/man/vaydns-server.1 +++ b/man/vaydns-server.1 @@ -149,6 +149,24 @@ Default: with .Fl dnstt-compat ) . +.It Fl response-delay Ar DURATION +Maximum time to hold a DNS response open while waiting for downstream +data before sending an empty response. +Default: +.Cm 200ms . + +.It Fl response-queue-size Ar N +Pending DNS response queue size. +0 means +.Fl queue-size . +Default: +.Cm 0 . + +.It Fl response-workers Ar N +Number of DNS response sender workers. +Default: +.Cm 2 . + .It Fl fallback Ar ADDR UDP endpoint to forward non-DNS packets to. If an incoming packet is not a valid DNS message, @@ -176,10 +194,30 @@ Ignored when .Fl dnstt-compat is set. +.It Fl queue-size Ar N +Packet queue size for the DNS tunnel transport. +Default: +.Cm 512 . + +.It Fl kcp-window-size Ar N +KCP send and receive window size in packets. +0 means half of +.Fl queue-size . +Default: +.Cm 0 . + +.It Fl queue-overflow Ar MODE +Queue overflow behavior: +.Cm drop +or +.Cm block . +Default: +.Cm drop . + .It Fl log-level Ar LEVEL Log level: debug, info, warning, error. Default: -.Cm warning . +.Cm info . .El diff --git a/vaydns-client/main.go b/vaydns-client/main.go index fc7e621..5ac75ce 100644 --- a/vaydns-client/main.go +++ b/vaydns-client/main.go @@ -50,6 +50,11 @@ func main() { var reconnectMaxStr string var sessionCheckIntervalStr string var openStreamTimeoutStr string + var pollDelayStr string + var activePollDelayStr string + var pollMaxDelayStr string + var udpTransportStaleTimeoutStr string + var openStreamFailureLimit int var maxStreams int var udpWorkers int var udpSharedSocket bool @@ -129,6 +134,11 @@ Known TLS fingerprints for -utls are: flag.StringVar(&reconnectMaxStr, "reconnect-max", client.DefaultReconnectMaxDelay.String(), "maximum delay before retrying session creation (e.g. 5s, 30s)") flag.StringVar(&sessionCheckIntervalStr, "session-check-interval", client.DefaultSessionCheckInterval.String(), "interval for checking whether the current session is still alive (e.g. 100ms, 500ms)") flag.StringVar(&openStreamTimeoutStr, "open-stream-timeout", client.DefaultOpenStreamTimeout.String(), "timeout for opening an smux stream (e.g. 500ms, 3s)") + flag.StringVar(&pollDelayStr, "poll-delay", client.DefaultPollDelay.String(), "base delay before sending an empty DNS poll when idle (e.g. 500ms, 1s)") + flag.StringVar(&activePollDelayStr, "active-poll-delay", client.DefaultActivePollDelay.String(), "poll delay cap while streams are active or being opened (e.g. 100ms, 200ms)") + flag.StringVar(&pollMaxDelayStr, "poll-max-delay", client.DefaultPollMaxDelay.String(), "maximum idle backoff between empty DNS polls (e.g. 2s, 5s)") + flag.StringVar(&udpTransportStaleTimeoutStr, "udp-transport-stale-timeout", client.DefaultUDPTransportStaleTimeout.String(), "retire the current session if per-query UDP sees no valid response for this long while streams need transport") + flag.IntVar(&openStreamFailureLimit, "open-stream-failure-limit", client.DefaultOpenStreamFailureLimit, "retire an idle session after this many consecutive stream-open failures") flag.IntVar(&maxStreams, "max-streams", client.DefaultMaxStreams, "max concurrent streams per session (0 = unlimited)") flag.IntVar(&udpWorkers, "udp-workers", client.DefaultUDPWorkers, "number of concurrent UDP worker goroutines") flag.BoolVar(&udpSharedSocket, "udp-shared-socket", false, "use a single shared UDP socket instead of per-query sockets") @@ -270,6 +280,26 @@ Known TLS fingerprints for -utls are: fmt.Fprintf(os.Stderr, "invalid -open-stream-timeout: %v\n", err) os.Exit(1) } + pollDelay, err := time.ParseDuration(pollDelayStr) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -poll-delay: %v\n", err) + os.Exit(1) + } + activePollDelay, err := time.ParseDuration(activePollDelayStr) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -active-poll-delay: %v\n", err) + os.Exit(1) + } + pollMaxDelay, err := time.ParseDuration(pollMaxDelayStr) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -poll-max-delay: %v\n", err) + os.Exit(1) + } + udpTransportStaleTimeout, err := time.ParseDuration(udpTransportStaleTimeoutStr) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -udp-transport-stale-timeout: %v\n", err) + os.Exit(1) + } udpTimeout, err := time.ParseDuration(udpTimeoutStr) if err != nil { fmt.Fprintf(os.Stderr, "invalid -udp-timeout: %v\n", err) @@ -297,6 +327,26 @@ Known TLS fingerprints for -utls are: fmt.Fprintf(os.Stderr, "-open-stream-timeout (%s) must be greater than 0\n", openStreamTimeout) os.Exit(1) } + if pollDelay <= 0 { + fmt.Fprintf(os.Stderr, "-poll-delay (%s) must be greater than 0\n", pollDelay) + os.Exit(1) + } + if activePollDelay <= 0 { + fmt.Fprintf(os.Stderr, "-active-poll-delay (%s) must be greater than 0\n", activePollDelay) + os.Exit(1) + } + if pollMaxDelay < pollDelay { + fmt.Fprintf(os.Stderr, "-poll-max-delay (%s) must be greater than or equal to -poll-delay (%s)\n", pollMaxDelay, pollDelay) + os.Exit(1) + } + if udpTransportStaleTimeout <= 0 { + fmt.Fprintf(os.Stderr, "-udp-transport-stale-timeout (%s) must be greater than 0\n", udpTransportStaleTimeout) + os.Exit(1) + } + if openStreamFailureLimit <= 0 { + fmt.Fprintf(os.Stderr, "-open-stream-failure-limit (%d) must be greater than 0\n", openStreamFailureLimit) + os.Exit(1) + } if queueSize <= 0 { fmt.Fprintf(os.Stderr, "-queue-size (%d) must be greater than 0\n", queueSize) os.Exit(1) @@ -365,7 +415,6 @@ Known TLS fingerprints for -utls are: log.Warnf("-udp-accept-errors disables forged response filtering; per-query workers will accept the first response regardless of RCODE, which may cause connection failures under DNS injection") } } - // Build tunnel server config. ts, err := client.NewTunnelServer(domainArg, pubkeyHex) if err != nil { @@ -392,10 +441,25 @@ Known TLS fingerprints for -utls are: tunnel.ReconnectMinDelay = reconnectMinDelay tunnel.ReconnectMaxDelay = reconnectMaxDelay tunnel.SessionCheckInterval = sessionCheckInterval + tunnel.PollDelay = pollDelay + tunnel.ActivePollDelay = activePollDelay + tunnel.PollMaxDelay = pollMaxDelay + tunnel.UDPTransportStaleTimeout = udpTransportStaleTimeout + tunnel.OpenStreamFailureLimit = openStreamFailureLimit tunnel.PacketQueueSize = queueSize tunnel.KCPWindowSize = kcpWindowSize tunnel.QueueOverflowMode = queueOverflowMode - log.Infof("transport config: queue-size=%d kcp-window-size=%d queue-overflow=%s", queueSize, kcpWindowSize, queueOverflowMode) + log.Infof( + "transport config: queue-size=%d kcp-window-size=%d queue-overflow=%s poll-delay=%s active-poll-delay=%s poll-max-delay=%s udp-transport-stale-timeout=%s open-stream-failure-limit=%d", + queueSize, + kcpWindowSize, + queueOverflowMode, + pollDelay, + activePollDelay, + pollMaxDelay, + udpTransportStaleTimeout, + openStreamFailureLimit, + ) if compatDnstt { log.Infof("wire config: clientid-size=8 compat=true") diff --git a/vaydns-server/main.go b/vaydns-server/main.go index 71e56ec..360d76d 100644 --- a/vaydns-server/main.go +++ b/vaydns-server/main.go @@ -75,6 +75,12 @@ import ( const ( defaultIdleTimeout = 10 * time.Second defaultKeepAlive = 2 * time.Second + // Keep this comfortably below the default client UDP response timeout and + // low enough for interactive traffic. Long batching delays are acceptable + // for bulk transfer but make chat and proxy workloads feel broken. + defaultResponseDelay = 200 * time.Millisecond + defaultResponseWorkers = 2 + defaultResponseQueueSize = 0 // Bound the pre-smux handshake so half-open KCP sessions cannot linger // indefinitely and consume server resources. defaultHandshakeTimeout = 15 * time.Second @@ -82,16 +88,6 @@ const ( // How to set the TTL field in Answer resource records. responseTTL = 60 - // How long we may wait for downstream data before sending an empty - // response. If another query comes in while we are waiting, we'll send - // an empty response anyway and restart the delay timer for the next - // response. - // - // This number should be less than 2 seconds, which in 2019 was reported - // to be the query timeout of the Quad9 DoH server. - // https://dnsencryption.info/imc19-doe.html Section 4.2, Finding 2.4 - maxResponseDelay = 1 * time.Second - // How long to wait for a TCP connection to upstream to be established. upstreamDialTimeout = 30 * time.Second @@ -125,16 +121,19 @@ var base32Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) // ServerStats tracks query processing statistics. type ServerStats struct { - total uint64 - success uint64 + total uint64 + success uint64 + responseDropped uint64 } -func (s *ServerStats) incTotal() { atomic.AddUint64(&s.total, 1) } -func (s *ServerStats) incSuccess() { atomic.AddUint64(&s.success, 1) } +func (s *ServerStats) incTotal() { atomic.AddUint64(&s.total, 1) } +func (s *ServerStats) incSuccess() { atomic.AddUint64(&s.success, 1) } +func (s *ServerStats) incResponseDropped() { atomic.AddUint64(&s.responseDropped, 1) } func (s *ServerStats) log() { total := atomic.LoadUint64(&s.total) success := atomic.LoadUint64(&s.success) - log.Debugf("stats | total: %d | success: %d", total, success) + responseDropped := atomic.LoadUint64(&s.responseDropped) + log.Debugf("stats | total: %d | success: %d | response_dropped: %d", total, success, responseDropped) } // generateKeypair generates a private key and the corresponding public key. If @@ -236,6 +235,9 @@ func handleStream(stream *smux.Stream, upstream string, conv uint32) error { } defer upstreamConn.Close() upstreamTCPConn := upstreamConn.(*net.TCPConn) + if err := upstreamTCPConn.SetNoDelay(true); err != nil { + log.Debugf("stream %08x:%d upstream TCP_NODELAY: %v", conv, stream.ID(), err) + } var wg sync.WaitGroup wg.Add(2) @@ -338,8 +340,10 @@ func acceptSessions(ln *kcp.Listener, privkey []byte, mtu int, upstream string, log.Infof("session %08x ready", conn.GetConv()) // Permit coalescing the payloads of consecutive sends. conn.SetStreamMode(true) - // Disable the dynamic congestion window (limit only by the - // maximum of local and remote static windows). + // Keep the congestion window disabled, but otherwise stay on KCP's + // conservative timing. The DNS layer itself is the real pacing + // bottleneck on shutdown paths; pushing KCP harder mostly creates + // extra churn. conn.SetNoDelay( 0, // default nodelay 0, // default interval @@ -562,6 +566,18 @@ type record struct { ClientID turbotunnel.ClientID } +func enqueueResponse(ch chan *record, rec *record, stats *ServerStats) bool { + select { + case ch <- rec: + return true + default: + if stats != nil { + stats.incResponseDropped() + } + return false + } +} + // --- Fallback NAT logic for non-DNS packets --- // UDPAddrKey is a comparable struct that can be used as a map key to represent @@ -693,7 +709,7 @@ func (m *FallbackManager) forwardReplies(proxyConn net.PacketConn, clientAddr ne // the incoming DNS queries, and puts them on ttConn's incoming queue. Whenever // a query calls for a response, constructs a partial response and passes it to // sendLoop over ch. Invalid DNS packets are passed to the FallbackManager. -func recvLoop(domain dns.Name, dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch chan<- *record, fallbackMgr *FallbackManager, stats *ServerStats, wireConfig turbotunnel.WireConfig) error { +func recvLoop(domain dns.Name, dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch chan *record, fallbackMgr *FallbackManager, stats *ServerStats, wireConfig turbotunnel.WireConfig) error { for { var buf [4096]byte n, addr, err := dnsConn.ReadFrom(buf[:]) @@ -750,13 +766,9 @@ func recvLoop(domain dns.Name, dnsConn net.PacketConn, ttConn *turbotunnel.Queue } // If a response is called for, pass it to sendLoop via the channel. if resp != nil { - if resp.Rcode() == dns.RcodeNoError { + if enqueueResponse(ch, &record{resp, addr, clientID}, stats) && resp.Rcode() == dns.RcodeNoError { stats.incSuccess() } - select { - case ch <- &record{resp, addr, clientID}: - default: - } } } } @@ -821,7 +833,7 @@ func encodeResponsePayload(rec *record, data []byte, domain dns.Name) error { // response, it sends on the network immediately. Those that represent a // response capable of carrying data, it packs full of as many packets as will // fit while keeping the total size under maxEncodedPayload, then sends it. -func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <-chan *record, maxEncodedPayload int, domain dns.Name) error { +func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <-chan *record, maxEncodedPayload int, responseDelay time.Duration, domain dns.Name) error { var nextRec *record for { rec := nextRec @@ -857,7 +869,7 @@ func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <- // into the response as will fit. Any packet that would // overflow the capacity of the DNS response, we stash // to be bundled into a future response. - timer := time.NewTimer(maxResponseDelay) + timer := time.NewTimer(responseDelay) for { var p []byte unstash := ttConn.Unstash(rec.ClientID) @@ -1125,7 +1137,7 @@ func computeMaxEncodedPayloadMultiRR(limit int, chunkSize int) int { return low } -func run(privkey []byte, domain dns.Name, upstream string, dnsConn net.PacketConn, fallbackAddr *net.UDPAddr, idleTimeout time.Duration, keepAlive time.Duration, queueSize int, kcpWindowSize int, queueOverflowMode turbotunnel.QueueOverflowMode, wireConfig turbotunnel.WireConfig) error { +func run(privkey []byte, domain dns.Name, upstream string, dnsConn net.PacketConn, fallbackAddr *net.UDPAddr, idleTimeout time.Duration, keepAlive time.Duration, queueSize int, kcpWindowSize int, queueOverflowMode turbotunnel.QueueOverflowMode, responseQueueSize int, responseWorkers int, responseDelay time.Duration, wireConfig turbotunnel.WireConfig) error { defer dnsConn.Close() log.Infof("pubkey %x", noise.PubkeyFromPrivkey(privkey)) @@ -1176,7 +1188,17 @@ func run(privkey []byte, domain dns.Name, upstream string, dnsConn net.PacketCon } }() - ch := make(chan *record, 100) + if responseQueueSize <= 0 { + responseQueueSize = queueSize + } + if responseWorkers <= 0 { + responseWorkers = defaultResponseWorkers + } + if responseDelay <= 0 { + responseDelay = defaultResponseDelay + } + + ch := make(chan *record, responseQueueSize) defer close(ch) // Create a fallback manager if an address is specified. @@ -1194,15 +1216,14 @@ func run(privkey []byte, domain dns.Name, upstream string, dnsConn net.PacketCon } }() - // We could run multiple copies of sendLoop; that would allow more time - // for each response to collect downstream data before being evicted by - // another response that needs to be sent. - go func() { - err := sendLoop(dnsConn, ttConn, ch, maxEncodedPayload, domain) - if err != nil { - log.Warnf("sendLoop: %v", err) - } - }() + for i := 0; i < responseWorkers; i++ { + go func() { + err := sendLoop(dnsConn, ttConn, ch, maxEncodedPayload, responseDelay, domain) + if err != nil { + log.Warnf("sendLoop: %v", err) + } + }() + } return recvLoop(domain, dnsConn, ttConn, ch, fallbackMgr, stats, wireConfig) } @@ -1227,6 +1248,9 @@ func main() { var queueSize int var kcpWindowSize int var queueOverflowStr string + var responseQueueSize int + var responseWorkers int + var responseDelayStr string flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), `Usage: @@ -1263,6 +1287,9 @@ Example: flag.IntVar(&queueSize, "queue-size", turbotunnel.QueueSize, "packet queue size for DNS tunnel transport") flag.IntVar(&kcpWindowSize, "kcp-window-size", 0, "KCP send/receive window size in packets (0 = queue-size/2)") flag.StringVar(&queueOverflowStr, "queue-overflow", string(turbotunnel.DefaultQueueOverflowMode), "queue overflow behavior: drop or block") + flag.IntVar(&responseQueueSize, "response-queue-size", defaultResponseQueueSize, "pending DNS response queue size (0 = queue-size)") + flag.IntVar(&responseWorkers, "response-workers", defaultResponseWorkers, "number of DNS response sender workers") + flag.StringVar(&responseDelayStr, "response-delay", defaultResponseDelay.String(), "maximum time to hold a DNS response open for downstream data (e.g. 100ms, 200ms)") var logLevel string flag.StringVar(&logLevel, "log-level", "info", "log level (debug, info, warning, error)") @@ -1413,6 +1440,11 @@ Example: fmt.Fprintf(os.Stderr, "invalid -keepalive: %v\n", err) os.Exit(1) } + responseDelay, err := time.ParseDuration(responseDelayStr) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid -response-delay: %v\n", err) + os.Exit(1) + } if keepAlive >= idleTimeout { fmt.Fprintf(os.Stderr, "-keepalive (%s) must be less than -idle-timeout (%s)\n", keepAlive, idleTimeout) os.Exit(1) @@ -1435,12 +1467,28 @@ Example: fmt.Fprintf(os.Stderr, "-kcp-window-size (%d) must be <= -queue-size (%d)\n", kcpWindowSize, queueSize) os.Exit(1) } + if responseQueueSize < 0 { + fmt.Fprintf(os.Stderr, "-response-queue-size (%d) must be >= 0\n", responseQueueSize) + os.Exit(1) + } + if responseWorkers <= 0 { + fmt.Fprintf(os.Stderr, "-response-workers (%d) must be greater than 0\n", responseWorkers) + os.Exit(1) + } + if responseDelay <= 0 { + fmt.Fprintf(os.Stderr, "-response-delay (%s) must be greater than 0\n", responseDelay) + os.Exit(1) + } queueOverflowMode, err := turbotunnel.ParseQueueOverflowMode(queueOverflowStr) if err != nil { fmt.Fprintf(os.Stderr, "invalid -queue-overflow: %v\n", err) os.Exit(1) } - log.Infof("transport config: queue-size=%d kcp-window-size=%d queue-overflow=%s", queueSize, kcpWindowSize, queueOverflowMode) + effectiveResponseQueueSize := responseQueueSize + if effectiveResponseQueueSize == 0 { + effectiveResponseQueueSize = queueSize + } + log.Infof("transport config: queue-size=%d kcp-window-size=%d queue-overflow=%s response-queue-size=%d response-workers=%d response-delay=%s", queueSize, kcpWindowSize, queueOverflowMode, effectiveResponseQueueSize, responseWorkers, responseDelay) var wireConfig turbotunnel.WireConfig if compatDnstt { @@ -1486,7 +1534,7 @@ Example: } } - err = run(privkey, domain, upstream, dnsConn, fallbackAddr, idleTimeout, keepAlive, queueSize, kcpWindowSize, queueOverflowMode, wireConfig) + err = run(privkey, domain, upstream, dnsConn, fallbackAddr, idleTimeout, keepAlive, queueSize, kcpWindowSize, queueOverflowMode, responseQueueSize, responseWorkers, responseDelay, wireConfig) if err != nil { log.Fatalf("%v", err) }