Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ sudo ip6tables -t nat -I PREROUTING -i eth0 -p udp --dport 53 -j REDIRECT --to-p
| `-queue-size N` | Packet queue size for transport and DNS layers | `512` |
| `-kcp-window-size N` | KCP send/receive window size in packets (0 = queue-size/2) | `0` |
| `-queue-overflow MODE` | Queue overflow behavior: `drop` (silent discard) or `block` (backpressure) | `drop` |
| `-response-queue-size N` | Pending DNS response queue size (0 = `queue-size`) | `0` |
| `-response-workers N` | Number of DNS response sender workers | `2` |
| `-response-delay D` | Max time to hold a DNS response open while waiting for downstream data | `200ms` |
| `-log-level LEVEL` | Log level: debug, info, warning, error | `info` |

> **Note:** The server response queue is drop-based. When it fills, pending DNS responses are dropped and counted in the debug stats log as `response_dropped`.

### Client flags

#### Transport (pick one)
Expand Down Expand Up @@ -168,8 +173,11 @@ sudo ip6tables -t nat -I PREROUTING -i eth0 -p udp --dport 53 -j REDIRECT --to-p
| `-keepalive D` | Keepalive ping interval (must match server, must be < idle-timeout) | `2s` |
| `-max-streams N` | Max concurrent streams per session (0 = unlimited) | `0` |
| `-open-stream-timeout D` | Timeout for opening an smux stream | `10s` |
| `-open-stream-failure-limit N` | Retire an idle session after this many consecutive stream-open failures | `3` |
| `-reconnect-min D` | Initial backoff delay for session reconnect | `1s` |
| `-reconnect-max D` | Max backoff delay (must be >= reconnect-min) | `30s` |
| `-session-check-interval D` | How often the managed client checks session and transport health | `500ms` |
| `-udp-transport-stale-timeout D` | Retire the current session if per-query UDP sees no valid response for this long while streams need transport | `3s` |

> **Note:** `idle-timeout` and `keepalive` must be set to the same values on both client and server — mismatched values will cause one side to close the session before the other detects it. Keep `keepalive` well below `idle-timeout` (the default 5x ratio allows ~5 ping attempts before timeout).
>
Expand All @@ -185,6 +193,9 @@ These flags only apply when using `-udp`. By default, each query is sent from a
| `-udp-timeout D` | Per-query response timeout — the total time a worker waits for a valid (NOERROR) response. Forged responses are discarded but the deadline is not extended — if no valid response arrives within this window, the query is abandoned. | `500ms` |
| `-udp-shared-socket` | Use a single shared UDP socket instead of per-query sockets. By default, each query is sent from a new socket with a random ephemeral source port, making the tunnel harder to fingerprint or block by port. With this flag, all queries share one socket and source port for the lifetime of the client — blocking that port kills the tunnel. | `false` |
| `-udp-accept-errors` | In per-query mode, accept the first DNS response regardless of RCODE instead of waiting for a NOERROR response. This disables forged response filtering — the worker stops waiting after the first forged response, so the real response is likely lost. Only useful for debugging; not recommended in production. Ignored when `-udp-shared-socket` is set. | `false` |
| `-poll-delay D` | Base delay before sending an empty DNS poll when idle | `500ms` |
| `-active-poll-delay D` | Poll delay cap while streams are active or being opened | `200ms` |
| `-poll-max-delay D` | Max idle backoff between empty DNS polls | `2s` |

#### Queue and KCP tuning

Expand Down
166 changes: 138 additions & 28 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"syscall"
"time"

Expand All @@ -41,16 +42,21 @@ import (

// Default timeouts for VayDNS mode.
const (
DefaultIdleTimeout = 10 * time.Second
DefaultKeepAlive = 2 * time.Second
DefaultOpenStreamTimeout = 10 * time.Second
DefaultReconnectDelay = 1 * time.Second
DefaultReconnectMaxDelay = 30 * time.Second
DefaultSessionCheckInterval = 500 * time.Millisecond
DefaultUDPResponseTimeout = 500 * time.Millisecond
DefaultUDPWorkers = 100
DefaultMaxStreams = 0 // unlimited
DefaultHandshakeTimeout = 15 * time.Second
DefaultIdleTimeout = 10 * time.Second
DefaultKeepAlive = 2 * time.Second
DefaultOpenStreamTimeout = 10 * time.Second
DefaultReconnectDelay = 1 * time.Second
DefaultReconnectMaxDelay = 30 * time.Second
DefaultSessionCheckInterval = 500 * time.Millisecond
DefaultUDPResponseTimeout = 500 * time.Millisecond
DefaultUDPWorkers = 100
DefaultMaxStreams = 0 // unlimited
DefaultHandshakeTimeout = 15 * time.Second
DefaultPollDelay = 500 * time.Millisecond
DefaultActivePollDelay = 200 * time.Millisecond
DefaultPollMaxDelay = 2 * time.Second
DefaultUDPTransportStaleTimeout = 3 * time.Second
DefaultOpenStreamFailureLimit = 3
)

// Default timeouts for dnstt compatibility mode.
Expand Down Expand Up @@ -194,17 +200,22 @@ type Tunnel struct {
TunnelServer TunnelServer

// Session configuration. Zero values use defaults.
IdleTimeout time.Duration // default: 10s (2m with DnsttCompat)
KeepAlive time.Duration // default: 2s (10s with DnsttCompat)
OpenStreamTimeout time.Duration // default: 10s
MaxStreams int // default: 0 (0 = unlimited)
ReconnectMinDelay time.Duration // default: 1s
ReconnectMaxDelay time.Duration // default: 30s
SessionCheckInterval time.Duration // default: 500ms
HandshakeTimeout time.Duration // default: 15s
PacketQueueSize int // default: QueueSize (512)
KCPWindowSize int // default: PacketQueueSize/2
QueueOverflowMode turbotunnel.QueueOverflowMode // default: drop
IdleTimeout time.Duration // default: 10s (2m with DnsttCompat)
KeepAlive time.Duration // default: 2s (10s with DnsttCompat)
OpenStreamTimeout time.Duration // default: 10s
MaxStreams int // default: 0 (0 = unlimited)
ReconnectMinDelay time.Duration // default: 1s
ReconnectMaxDelay time.Duration // default: 30s
SessionCheckInterval time.Duration // default: 500ms
HandshakeTimeout time.Duration // default: 15s
PacketQueueSize int // default: QueueSize (512)
KCPWindowSize int // default: PacketQueueSize/2
QueueOverflowMode turbotunnel.QueueOverflowMode // default: drop
PollDelay time.Duration // default: 500ms
ActivePollDelay time.Duration // default: 200ms
PollMaxDelay time.Duration // default: 2s
UDPTransportStaleTimeout time.Duration // default: 3s (UDP per-query only)
OpenStreamFailureLimit int // default: 3 consecutive idle failures

// internal state
wireConfig turbotunnel.WireConfig
Expand All @@ -215,6 +226,7 @@ type Tunnel struct {
noiseChannel io.ReadWriteCloser
smuxSession *smux.Session
remoteAddr net.Addr
activeStreams atomic.Int32
}

// NewTunnel creates a Tunnel with the given resolver and server configuration.
Expand Down Expand Up @@ -263,6 +275,21 @@ func (t *Tunnel) applyDefaults() {
if t.HandshakeTimeout == 0 {
t.HandshakeTimeout = DefaultHandshakeTimeout
}
if t.PollDelay == 0 {
t.PollDelay = DefaultPollDelay
}
if t.ActivePollDelay == 0 {
t.ActivePollDelay = DefaultActivePollDelay
}
if t.PollMaxDelay == 0 {
t.PollMaxDelay = DefaultPollMaxDelay
}
if t.UDPTransportStaleTimeout == 0 {
t.UDPTransportStaleTimeout = DefaultUDPTransportStaleTimeout
}
if t.OpenStreamFailureLimit == 0 {
t.OpenStreamFailureLimit = DefaultOpenStreamFailureLimit
}
}

func (t *Tunnel) effectivePacketQueueSize() int {
Expand Down Expand Up @@ -370,13 +397,30 @@ func (t *Tunnel) InitiateResolverConnection() error {

// InitiateDNSPacketConn wraps the resolver connection with DNS encoding.
func (t *Tunnel) InitiateDNSPacketConn(domain dns.Name) error {
t.applyDefaults()
var rateLimiter *RateLimiter
if t.TunnelServer.RPS > 0 {
rateLimiter = NewRateLimiter(t.TunnelServer.RPS)
}
maxQnameLen := t.TunnelServer.effectiveMaxQnameLen()
rrType := t.TunnelServer.effectiveRRType()
t.dnsPacketConn = NewDNSPacketConn(t.resolverConn, t.remoteAddr, domain, rateLimiter, maxQnameLen, t.TunnelServer.MaxNumLabels, t.wireConfig, t.forgedStats, rrType, t.effectivePacketQueueSize(), t.effectiveQueueOverflowMode())
t.dnsPacketConn = newDNSPacketConn(
t.resolverConn,
t.remoteAddr,
domain,
rateLimiter,
maxQnameLen,
t.TunnelServer.MaxNumLabels,
t.wireConfig,
t.forgedStats,
rrType,
&t.activeStreams,
t.PollDelay,
t.ActivePollDelay,
t.PollMaxDelay,
t.effectivePacketQueueSize(),
t.effectiveQueueOverflowMode(),
)
return nil
}

Expand Down Expand Up @@ -525,6 +569,12 @@ func (t *Tunnel) OpenStream() (net.Conn, error) {

// Handle forwards data between a local TCP connection and a tunnel stream.
func (t *Tunnel) Handle(lconn *net.TCPConn) error {
if err := lconn.SetNoDelay(true); err != nil {
log.Debugf("local TCP_NODELAY: %v", err)
}
t.activeStreams.Add(1)
defer t.activeStreams.Add(-1)

stream, err := t.OpenStream()
if err != nil {
return err
Expand Down Expand Up @@ -603,6 +653,24 @@ func (t *Tunnel) resetTransportLayers() error {
return nil
}

func (t *Tunnel) udpTransportStaleAge(requireTraffic bool) time.Duration {
if !requireTraffic || t.UDPTransportStaleTimeout <= 0 {
return 0
}
if t.Resolver.ResolverType != ResolverTypeUDP || t.Resolver.UDPSharedSocket {
return 0
}
udpConn, ok := t.resolverConn.(*UDPPacketConn)
if !ok {
return 0
}
lastSuccess := udpConn.lastSuccessTime()
if lastSuccess.IsZero() {
return 0
}
return time.Since(lastSuccess)
}

// ListenAndServe starts a TCP listener and forwards connections through the
// tunnel with automatic session reconnection. This is the main entry point
// for the CLI.
Expand Down Expand Up @@ -659,9 +727,24 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error {
for {
conn, sess, err = t.createSession(mtu)
if err == nil {
log.Infof("session %08x ready", conn.GetConv())
break
}
log.Warnf("session creation failed: %v; rebuilding transport and retrying in %v", err, delay)
t.closeTransportLayers()
for {
if err := t.resetTransportLayers(); err != nil {
log.Warnf("transport rebuild failed: %v; retrying in %v", err, delay)
time.Sleep(delay)
delay *= 2
if delay > t.ReconnectMaxDelay {
delay = t.ReconnectMaxDelay
}
continue
}
transportErrCh = t.dnsPacketConn.TransportErrors()
break
}
log.Warnf("session creation failed: %v; retrying in %v", err, delay)
time.Sleep(delay)
delay *= 2
if delay > t.ReconnectMaxDelay {
Expand All @@ -671,13 +754,19 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error {

sessDone := sess.CloseChan()
conv := conn.GetConv()
var openFailCount atomic.Int32

sessionAlive := true
for sessionAlive {
ln.SetDeadline(time.Now().Add(t.SessionCheckInterval))
local, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
if age := t.udpTransportStaleAge(t.activeStreams.Load() > 0); age > t.UDPTransportStaleTimeout {
log.Warnf("session %08x transport stale after %v with %d active streams", conv, age, t.activeStreams.Load())
sessionAlive = false
continue
}
select {
case <-sessDone:
sessionAlive = false
Expand Down Expand Up @@ -706,18 +795,31 @@ func (t *Tunnel) ListenAndServe(listenAddr string) error {
continue
default:
}
if age := t.udpTransportStaleAge(t.activeStreams.Load() > 0 || openFailCount.Load() > 0); age > t.UDPTransportStaleTimeout {
log.Warnf("session %08x transport stale after %v while streams need transport", conv, age)
local.Close()
sessionAlive = false
continue
}

go func(sess *smux.Session, conv uint32) {
go func(local *net.TCPConn, sess *smux.Session, conv uint32, openFailCount *atomic.Int32) {
if sem != nil {
sem <- struct{}{}
defer func() { <-sem }()
}
defer local.Close()
err := t.handleConn(local.(*net.TCPConn), sess, conv)
err := t.handleConn(local, sess, conv, openFailCount)
if err != nil {
log.Warnf("handle: %v", err)
if t.activeStreams.Load() == 0 {
failures := openFailCount.Add(1)
if failures >= int32(t.OpenStreamFailureLimit) && !sess.IsClosed() {
log.Warnf("session %08x retiring idle session after %d consecutive stream-open failures", conv, failures)
sess.Close()
}
}
}
}(sess, conv)
}(local.(*net.TCPConn), sess, conv, &openFailCount)
}

log.Warnf("session %08x closed, reconnecting", conv)
Expand All @@ -733,7 +835,6 @@ func (t *Tunnel) createSession(mtu int) (*kcp.UDPSession, *smux.Session, error)
if err != nil {
return nil, nil, fmt.Errorf("opening KCP conn: %v", err)
}
log.Infof("session %08x ready", conn.GetConv())
conn.SetStreamMode(true)
conn.SetNoDelay(0, 0, 0, 1)
conn.SetWindowSize(t.effectiveKCPWindowSize(), t.effectiveKCPWindowSize())
Expand Down Expand Up @@ -763,11 +864,20 @@ func (t *Tunnel) createSession(mtu int) (*kcp.UDPSession, *smux.Session, error)
}

// handleConn forwards a single TCP connection through the tunnel session.
func (t *Tunnel) handleConn(local *net.TCPConn, sess *smux.Session, conv uint32) error {
func (t *Tunnel) handleConn(local *net.TCPConn, sess *smux.Session, conv uint32, openFailCount *atomic.Int32) error {
if err := local.SetNoDelay(true); err != nil {
log.Debugf("stream %08x local TCP_NODELAY: %v", conv, err)
}
t.activeStreams.Add(1)
defer t.activeStreams.Add(-1)

stream, err := openStreamWithTimeout(conv, t.OpenStreamTimeout, sess.OpenStream)
if err != nil {
return err
}
if openFailCount != nil {
openFailCount.Store(0)
}

defer func() {
log.Debugf("stream %08x:%d closed", conv, stream.ID())
Expand Down
Loading
Loading