Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ slipstrampluss
slipstreampluss
SlipStreamPlus.tar.gz
centralserver
central
slipstreamplusb
# Rust core binaries (built by CI from source)
slipstreamorg/slipstream-client-*
Expand Down
26 changes: 17 additions & 9 deletions cmd/centralserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type connState struct {
txSeq uint32 // next sequence number for reverse data
cancel context.CancelFunc
created time.Time
lastActive time.Time // last time data was sent or received

// Sources: all tunnel connections that can carry reverse data.
// We round-robin responses across them (not broadcast).
Expand Down Expand Up @@ -345,12 +346,14 @@ func (cs *centralServer) handleSYN(frame *tunnel.Frame, source *sourceConn) {
targetAddr = fmt.Sprintf("[%s]:%d", net.IP(addr).String(), binary.BigEndian.Uint16(port))
}

now := time.Now()
ctx, cancel := context.WithCancel(context.Background())
state := &connState{
reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum
sources: []*sourceConn{source},
cancel: cancel,
created: time.Now(),
reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum
sources: []*sourceConn{source},
cancel: cancel,
created: now,
lastActive: now,
}
cs.conns[connID] = state
cs.mu.Unlock()
Expand Down Expand Up @@ -475,6 +478,7 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3
state.mu.Lock()
seq := state.txSeq
state.txSeq++
state.lastActive = time.Now()
state.mu.Unlock()

frame := &tunnel.Frame{
Expand Down Expand Up @@ -558,6 +562,7 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) {
state.sources = append(state.sources, source)
}

state.lastActive = time.Now()
state.reorderer.Insert(frame.SeqNum, frame.Payload)
if state.target == nil {
return // buffered, flushed when upstream connects
Expand Down Expand Up @@ -632,7 +637,7 @@ func (cs *centralServer) closeAll() {
}

func (cs *centralServer) cleanupLoop() {
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for range ticker.C {
cs.mu.Lock()
Expand All @@ -642,16 +647,19 @@ func (cs *centralServer) cleanupLoop() {
state.mu.Lock()
shouldClean := false

// No upstream established after 2 minutes = stuck
if state.target == nil && now.Sub(state.created) > 2*time.Minute {
// No upstream established after 60 seconds = stuck
if state.target == nil && now.Sub(state.created) > 60*time.Second {
shouldClean = true
}
// No sources left = all tunnel connections died
if len(state.sources) == 0 && now.Sub(state.created) > 30*time.Second {
shouldClean = true
}
// No max lifetime — long-lived connections (downloads, streams)
// are valid. Cleanup only based on actual broken state above.
// Idle for too long — no data sent or received in 60 seconds.
// This catches stuck connections where both sides stopped talking.
if now.Sub(state.lastActive) > 60*time.Second {
shouldClean = true
}

state.mu.Unlock()
if shouldClean {
Expand Down
21 changes: 12 additions & 9 deletions internal/proxy/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ func (s *Server) handleConnection(clientConn net.Conn, connID uint64) {

// handlePacketSplit handles a connection using packet-level load balancing.
func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte, addrBytes, portBytes []byte, user *users.User) {
// Fast-fail if no tunnels are connected yet (e.g., right after restart)
if !s.tunnelPool.HasTunnels() {
clientConn.Write([]byte{0x05, 0x01, 0x00, 0x01, 0, 0, 0, 0, 0, 0})
return
}

healthy := s.manager.HealthyInstances()
socksHealthy := make([]*engine.Instance, 0, len(healthy))
for _, inst := range healthy {
Expand All @@ -249,15 +255,12 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte
return
}

// Track connections on all used instances
for _, inst := range socksHealthy {
inst.IncrConns()
}
defer func() {
for _, inst := range socksHealthy {
inst.DecrConns()
}
}()
// Track connection count on ONE instance only (first healthy one).
// In packet_split, the single user connection is multiplexed across all
// instances, so incrementing all of them inflates the count by N×.
trackInst := socksHealthy[0]
trackInst.IncrConns()
defer trackInst.DecrConns()

// Create a packet splitter for this connection
tunnelConnID := s.connIDGen.Next()
Expand Down
40 changes: 35 additions & 5 deletions internal/tunnel/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tunnel

import (
"context"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -41,16 +42,37 @@ type TunnelPool struct {
handlers sync.Map // ConnID (uint32) → chan *Frame
stopCh chan struct{}
wg sync.WaitGroup
ready chan struct{} // closed when at least one tunnel is connected
readyOnce sync.Once
}

func NewTunnelPool(mgr *engine.Manager) *TunnelPool {
return &TunnelPool{
mgr: mgr,
tunnels: make(map[int]*TunnelConn),
stopCh: make(chan struct{}),
ready: make(chan struct{}),
}
}

// WaitReady blocks until at least one tunnel is connected, or ctx is cancelled.
func (p *TunnelPool) WaitReady(ctx context.Context) bool {
select {
case <-p.ready:
return true
case <-ctx.Done():
return false
}
}

// HasTunnels returns true if at least one tunnel is currently connected.
func (p *TunnelPool) HasTunnels() bool {
p.mu.RLock()
n := len(p.tunnels)
p.mu.RUnlock()
return n > 0
}

func (p *TunnelPool) Start() {
p.refreshConnections()

Expand Down Expand Up @@ -162,6 +184,14 @@ func (p *TunnelPool) refreshConnections() {
log.Printf("[tunnel-pool] connected to instance %d (%s:%d)",
inst.ID(), inst.Config.Domain, inst.Config.Port)
}

// Signal readiness once we have at least one tunnel
if len(p.tunnels) > 0 {
p.readyOnce.Do(func() {
close(p.ready)
log.Printf("[tunnel-pool] ready (%d tunnels connected)", len(p.tunnels))
})
}
}

func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) {
Expand Down Expand Up @@ -230,12 +260,12 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {
ch := v.(chan *Frame)
select {
case ch <- frame:
case <-time.After(5 * time.Second):
// Buffer full for too long — connection is stuck, log and drop
log.Printf("[tunnel-pool] instance %d: frame buffer full for conn=%d, dropping frame seq=%d",
default:
// Drop immediately — MUST NOT block readLoop because it serves
// ALL ConnIDs on this tunnel. Blocking here stalls every other
// connection sharing this tunnel.
log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d seq=%d (buffer full)",
tc.inst.ID(), frame.ConnID, frame.SeqNum)
case <-p.stopCh:
return
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions internal/tunnel/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,46 @@ func (ps *PacketSplitter) RelayUpstreamToClient(ctx context.Context, client io.W
reorderer := NewReorderer()
var totalBytes int64

// Periodic ticker to:
// 1. Drive gap-skip timeouts even when no new frames arrive
// 2. Detect idle connections (no data for idleLimit → give up)
gapTicker := time.NewTicker(500 * time.Millisecond)
defer gapTicker.Stop()

const idleLimit = 30 * time.Second
lastActivity := time.Now()

for {
select {
case <-ctx.Done():
return totalBytes

case <-gapTicker.C:
// Periodically drive gap-skip even without new frames
for {
data := reorderer.Next()
if data == nil {
break
}
n, err := client.Write(data)
totalBytes += int64(n)
lastActivity = time.Now()
if err != nil {
return totalBytes
}
}
// Idle timeout: if nothing happened for too long, give up
if time.Since(lastActivity) > idleLimit {
return totalBytes
}

case frame, ok := <-ps.incoming:
if !ok {
return totalBytes
}

lastActivity = time.Now()

if frame.IsFIN() || frame.IsRST() {
for {
data := reorderer.Next()
Expand Down
Loading