From 7b4badc80f4c1d8597e9e63e4bf966496b395bed Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Sun, 29 Mar 2026 05:09:33 +0330 Subject: [PATCH] =?UTF-8?q?fix:=20resolve=20packet=5Fsplit=20instability?= =?UTF-8?q?=20=E2=80=94=20connection=20leak,=20idle=20hang,=20tunnel=20sta?= =?UTF-8?q?ll?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five root causes of degradation under load and after restart: 1. readLoop 5s backpressure blocked entire tunnel per stuck connection: one ConnID with full buffer caused ALL other ConnIDs on the same TunnelConn to stall for 5 seconds per frame. Reverted to immediate drop (with log) so readLoop never blocks. 2. Reorderer gap-skip only triggered on new frame arrival: if no more frames arrived, Next() was never called again and the timer never expired — connection hung forever. Added 500ms gapTicker in RelayUpstreamToClient to drive gap-skip and detect idle connections (30s idle → give up and close). 3. CentralServer had no idle-based cleanup after 5-min hard limit was removed: stuck connections accumulated indefinitely. Added lastActive tracking with 60s idle eviction, checked every 15 seconds. 4. IncrConns() called on all N instances per user connection inflated dashboard count by N× (40 users × 10 instances = 400 shown, not 40). Fixed to track on one instance only. 5. After restart, user connections arrived before any tunnel was ready, causing immediate SYN failures and reconnect storms. Added HasTunnels() fast-fail and WaitReady() helper on TunnelPool. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + cmd/centralserver/main.go | 26 +++++++++++++++--------- internal/proxy/socks5.go | 21 ++++++++++--------- internal/tunnel/pool.go | 40 ++++++++++++++++++++++++++++++++----- internal/tunnel/splitter.go | 31 ++++++++++++++++++++++++++++ 5 files changed, 96 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 411268e..be38a91 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ slipstrampluss slipstreampluss SlipStreamPlus.tar.gz centralserver +central slipstreamplusb # Rust core binaries (built by CI from source) slipstreamorg/slipstream-client-* diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index bb198e4..a76f053 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -39,6 +39,7 @@ type connState struct { txSeq uint32 // next sequence number for reverse data cancel context.CancelFunc created time.Time + lastActive time.Time // last time data was sent or received // Sources: all tunnel connections that can carry reverse data. // We round-robin responses across them (not broadcast). @@ -345,12 +346,14 @@ func (cs *centralServer) handleSYN(frame *tunnel.Frame, source *sourceConn) { targetAddr = fmt.Sprintf("[%s]:%d", net.IP(addr).String(), binary.BigEndian.Uint16(port)) } + now := time.Now() ctx, cancel := context.WithCancel(context.Background()) state := &connState{ - reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum - sources: []*sourceConn{source}, - cancel: cancel, - created: time.Now(), + reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum + sources: []*sourceConn{source}, + cancel: cancel, + created: now, + lastActive: now, } cs.conns[connID] = state cs.mu.Unlock() @@ -475,6 +478,7 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 state.mu.Lock() seq := state.txSeq state.txSeq++ + state.lastActive = time.Now() state.mu.Unlock() frame := &tunnel.Frame{ @@ -558,6 +562,7 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { state.sources = append(state.sources, source) } + state.lastActive = time.Now() state.reorderer.Insert(frame.SeqNum, frame.Payload) if state.target == nil { return // buffered, flushed when upstream connects @@ -632,7 +637,7 @@ func (cs *centralServer) closeAll() { } func (cs *centralServer) cleanupLoop() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for range ticker.C { cs.mu.Lock() @@ -642,16 +647,19 @@ func (cs *centralServer) cleanupLoop() { state.mu.Lock() shouldClean := false - // No upstream established after 2 minutes = stuck - if state.target == nil && now.Sub(state.created) > 2*time.Minute { + // No upstream established after 60 seconds = stuck + if state.target == nil && now.Sub(state.created) > 60*time.Second { shouldClean = true } // No sources left = all tunnel connections died if len(state.sources) == 0 && now.Sub(state.created) > 30*time.Second { shouldClean = true } - // No max lifetime — long-lived connections (downloads, streams) - // are valid. Cleanup only based on actual broken state above. + // Idle for too long — no data sent or received in 60 seconds. + // This catches stuck connections where both sides stopped talking. + if now.Sub(state.lastActive) > 60*time.Second { + shouldClean = true + } state.mu.Unlock() if shouldClean { diff --git a/internal/proxy/socks5.go b/internal/proxy/socks5.go index 17f5931..6c01164 100644 --- a/internal/proxy/socks5.go +++ b/internal/proxy/socks5.go @@ -237,6 +237,12 @@ func (s *Server) handleConnection(clientConn net.Conn, connID uint64) { // handlePacketSplit handles a connection using packet-level load balancing. func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte, addrBytes, portBytes []byte, user *users.User) { + // Fast-fail if no tunnels are connected yet (e.g., right after restart) + if !s.tunnelPool.HasTunnels() { + clientConn.Write([]byte{0x05, 0x01, 0x00, 0x01, 0, 0, 0, 0, 0, 0}) + return + } + healthy := s.manager.HealthyInstances() socksHealthy := make([]*engine.Instance, 0, len(healthy)) for _, inst := range healthy { @@ -249,15 +255,12 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte return } - // Track connections on all used instances - for _, inst := range socksHealthy { - inst.IncrConns() - } - defer func() { - for _, inst := range socksHealthy { - inst.DecrConns() - } - }() + // Track connection count on ONE instance only (first healthy one). + // In packet_split, the single user connection is multiplexed across all + // instances, so incrementing all of them inflates the count by N×. + trackInst := socksHealthy[0] + trackInst.IncrConns() + defer trackInst.DecrConns() // Create a packet splitter for this connection tunnelConnID := s.connIDGen.Next() diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index df0e070..4fbef24 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -1,6 +1,7 @@ package tunnel import ( + "context" "fmt" "io" "log" @@ -41,6 +42,8 @@ type TunnelPool struct { handlers sync.Map // ConnID (uint32) → chan *Frame stopCh chan struct{} wg sync.WaitGroup + ready chan struct{} // closed when at least one tunnel is connected + readyOnce sync.Once } func NewTunnelPool(mgr *engine.Manager) *TunnelPool { @@ -48,9 +51,28 @@ func NewTunnelPool(mgr *engine.Manager) *TunnelPool { mgr: mgr, tunnels: make(map[int]*TunnelConn), stopCh: make(chan struct{}), + ready: make(chan struct{}), } } +// WaitReady blocks until at least one tunnel is connected, or ctx is cancelled. +func (p *TunnelPool) WaitReady(ctx context.Context) bool { + select { + case <-p.ready: + return true + case <-ctx.Done(): + return false + } +} + +// HasTunnels returns true if at least one tunnel is currently connected. +func (p *TunnelPool) HasTunnels() bool { + p.mu.RLock() + n := len(p.tunnels) + p.mu.RUnlock() + return n > 0 +} + func (p *TunnelPool) Start() { p.refreshConnections() @@ -162,6 +184,14 @@ func (p *TunnelPool) refreshConnections() { log.Printf("[tunnel-pool] connected to instance %d (%s:%d)", inst.ID(), inst.Config.Domain, inst.Config.Port) } + + // Signal readiness once we have at least one tunnel + if len(p.tunnels) > 0 { + p.readyOnce.Do(func() { + close(p.ready) + log.Printf("[tunnel-pool] ready (%d tunnels connected)", len(p.tunnels)) + }) + } } func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) { @@ -230,12 +260,12 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { ch := v.(chan *Frame) select { case ch <- frame: - case <-time.After(5 * time.Second): - // Buffer full for too long — connection is stuck, log and drop - log.Printf("[tunnel-pool] instance %d: frame buffer full for conn=%d, dropping frame seq=%d", + default: + // Drop immediately — MUST NOT block readLoop because it serves + // ALL ConnIDs on this tunnel. Blocking here stalls every other + // connection sharing this tunnel. + log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d seq=%d (buffer full)", tc.inst.ID(), frame.ConnID, frame.SeqNum) - case <-p.stopCh: - return } } } diff --git a/internal/tunnel/splitter.go b/internal/tunnel/splitter.go index 9577b36..eea6924 100644 --- a/internal/tunnel/splitter.go +++ b/internal/tunnel/splitter.go @@ -121,15 +121,46 @@ func (ps *PacketSplitter) RelayUpstreamToClient(ctx context.Context, client io.W reorderer := NewReorderer() var totalBytes int64 + // Periodic ticker to: + // 1. Drive gap-skip timeouts even when no new frames arrive + // 2. Detect idle connections (no data for idleLimit → give up) + gapTicker := time.NewTicker(500 * time.Millisecond) + defer gapTicker.Stop() + + const idleLimit = 30 * time.Second + lastActivity := time.Now() + for { select { case <-ctx.Done(): return totalBytes + + case <-gapTicker.C: + // Periodically drive gap-skip even without new frames + for { + data := reorderer.Next() + if data == nil { + break + } + n, err := client.Write(data) + totalBytes += int64(n) + lastActivity = time.Now() + if err != nil { + return totalBytes + } + } + // Idle timeout: if nothing happened for too long, give up + if time.Since(lastActivity) > idleLimit { + return totalBytes + } + case frame, ok := <-ps.incoming: if !ok { return totalBytes } + lastActivity = time.Now() + if frame.IsFIN() || frame.IsRST() { for { data := reorderer.Next()