diff --git a/.gitignore b/.gitignore index 411268e..be38a91 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ slipstrampluss slipstreampluss SlipStreamPlus.tar.gz centralserver +central slipstreamplusb # Rust core binaries (built by CI from source) slipstreamorg/slipstream-client-* diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index bb198e4..a76f053 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -39,6 +39,7 @@ type connState struct { txSeq uint32 // next sequence number for reverse data cancel context.CancelFunc created time.Time + lastActive time.Time // last time data was sent or received // Sources: all tunnel connections that can carry reverse data. // We round-robin responses across them (not broadcast). @@ -345,12 +346,14 @@ func (cs *centralServer) handleSYN(frame *tunnel.Frame, source *sourceConn) { targetAddr = fmt.Sprintf("[%s]:%d", net.IP(addr).String(), binary.BigEndian.Uint16(port)) } + now := time.Now() ctx, cancel := context.WithCancel(context.Background()) state := &connState{ - reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum - sources: []*sourceConn{source}, - cancel: cancel, - created: time.Now(), + reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum + sources: []*sourceConn{source}, + cancel: cancel, + created: now, + lastActive: now, } cs.conns[connID] = state cs.mu.Unlock() @@ -475,6 +478,7 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 state.mu.Lock() seq := state.txSeq state.txSeq++ + state.lastActive = time.Now() state.mu.Unlock() frame := &tunnel.Frame{ @@ -558,6 +562,7 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { state.sources = append(state.sources, source) } + state.lastActive = time.Now() state.reorderer.Insert(frame.SeqNum, frame.Payload) if state.target == nil { return // buffered, flushed when upstream connects @@ -632,7 +637,7 @@ func (cs *centralServer) closeAll() { } func (cs *centralServer) cleanupLoop() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for range ticker.C { cs.mu.Lock() @@ -642,16 +647,19 @@ func (cs *centralServer) cleanupLoop() { state.mu.Lock() shouldClean := false - // No upstream established after 2 minutes = stuck - if state.target == nil && now.Sub(state.created) > 2*time.Minute { + // No upstream established after 60 seconds = stuck + if state.target == nil && now.Sub(state.created) > 60*time.Second { shouldClean = true } // No sources left = all tunnel connections died if len(state.sources) == 0 && now.Sub(state.created) > 30*time.Second { shouldClean = true } - // No max lifetime — long-lived connections (downloads, streams) - // are valid. Cleanup only based on actual broken state above. + // Idle for too long — no data sent or received in 60 seconds. + // This catches stuck connections where both sides stopped talking. + if now.Sub(state.lastActive) > 60*time.Second { + shouldClean = true + } state.mu.Unlock() if shouldClean { diff --git a/internal/proxy/socks5.go b/internal/proxy/socks5.go index 17f5931..6c01164 100644 --- a/internal/proxy/socks5.go +++ b/internal/proxy/socks5.go @@ -237,6 +237,12 @@ func (s *Server) handleConnection(clientConn net.Conn, connID uint64) { // handlePacketSplit handles a connection using packet-level load balancing. func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte, addrBytes, portBytes []byte, user *users.User) { + // Fast-fail if no tunnels are connected yet (e.g., right after restart) + if !s.tunnelPool.HasTunnels() { + clientConn.Write([]byte{0x05, 0x01, 0x00, 0x01, 0, 0, 0, 0, 0, 0}) + return + } + healthy := s.manager.HealthyInstances() socksHealthy := make([]*engine.Instance, 0, len(healthy)) for _, inst := range healthy { @@ -249,15 +255,12 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte return } - // Track connections on all used instances - for _, inst := range socksHealthy { - inst.IncrConns() - } - defer func() { - for _, inst := range socksHealthy { - inst.DecrConns() - } - }() + // Track connection count on ONE instance only (first healthy one). + // In packet_split, the single user connection is multiplexed across all + // instances, so incrementing all of them inflates the count by N×. + trackInst := socksHealthy[0] + trackInst.IncrConns() + defer trackInst.DecrConns() // Create a packet splitter for this connection tunnelConnID := s.connIDGen.Next() diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index df0e070..4fbef24 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -1,6 +1,7 @@ package tunnel import ( + "context" "fmt" "io" "log" @@ -41,6 +42,8 @@ type TunnelPool struct { handlers sync.Map // ConnID (uint32) → chan *Frame stopCh chan struct{} wg sync.WaitGroup + ready chan struct{} // closed when at least one tunnel is connected + readyOnce sync.Once } func NewTunnelPool(mgr *engine.Manager) *TunnelPool { @@ -48,9 +51,28 @@ func NewTunnelPool(mgr *engine.Manager) *TunnelPool { mgr: mgr, tunnels: make(map[int]*TunnelConn), stopCh: make(chan struct{}), + ready: make(chan struct{}), } } +// WaitReady blocks until at least one tunnel is connected, or ctx is cancelled. +func (p *TunnelPool) WaitReady(ctx context.Context) bool { + select { + case <-p.ready: + return true + case <-ctx.Done(): + return false + } +} + +// HasTunnels returns true if at least one tunnel is currently connected. +func (p *TunnelPool) HasTunnels() bool { + p.mu.RLock() + n := len(p.tunnels) + p.mu.RUnlock() + return n > 0 +} + func (p *TunnelPool) Start() { p.refreshConnections() @@ -162,6 +184,14 @@ func (p *TunnelPool) refreshConnections() { log.Printf("[tunnel-pool] connected to instance %d (%s:%d)", inst.ID(), inst.Config.Domain, inst.Config.Port) } + + // Signal readiness once we have at least one tunnel + if len(p.tunnels) > 0 { + p.readyOnce.Do(func() { + close(p.ready) + log.Printf("[tunnel-pool] ready (%d tunnels connected)", len(p.tunnels)) + }) + } } func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) { @@ -230,12 +260,12 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { ch := v.(chan *Frame) select { case ch <- frame: - case <-time.After(5 * time.Second): - // Buffer full for too long — connection is stuck, log and drop - log.Printf("[tunnel-pool] instance %d: frame buffer full for conn=%d, dropping frame seq=%d", + default: + // Drop immediately — MUST NOT block readLoop because it serves + // ALL ConnIDs on this tunnel. Blocking here stalls every other + // connection sharing this tunnel. + log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d seq=%d (buffer full)", tc.inst.ID(), frame.ConnID, frame.SeqNum) - case <-p.stopCh: - return } } } diff --git a/internal/tunnel/splitter.go b/internal/tunnel/splitter.go index 9577b36..eea6924 100644 --- a/internal/tunnel/splitter.go +++ b/internal/tunnel/splitter.go @@ -121,15 +121,46 @@ func (ps *PacketSplitter) RelayUpstreamToClient(ctx context.Context, client io.W reorderer := NewReorderer() var totalBytes int64 + // Periodic ticker to: + // 1. Drive gap-skip timeouts even when no new frames arrive + // 2. Detect idle connections (no data for idleLimit → give up) + gapTicker := time.NewTicker(500 * time.Millisecond) + defer gapTicker.Stop() + + const idleLimit = 30 * time.Second + lastActivity := time.Now() + for { select { case <-ctx.Done(): return totalBytes + + case <-gapTicker.C: + // Periodically drive gap-skip even without new frames + for { + data := reorderer.Next() + if data == nil { + break + } + n, err := client.Write(data) + totalBytes += int64(n) + lastActivity = time.Now() + if err != nil { + return totalBytes + } + } + // Idle timeout: if nothing happened for too long, give up + if time.Since(lastActivity) > idleLimit { + return totalBytes + } + case frame, ok := <-ps.incoming: if !ok { return totalBytes } + lastActivity = time.Now() + if frame.IsFIN() || frame.IsRST() { for { data := reorderer.Next()