Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions transport/mux/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"

"github.com/hashicorp/yamux"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/channel"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -99,7 +100,6 @@ func (m *muxProvider) Start() {
// Notify all resources are cleaned up
m.hasCleanedUp.Shutdown()
}()
connect:
for {
// Wait for shutdown or session needed
err = m.muxPermits.Acquire(m.lifetime, 1)
Expand All @@ -110,48 +110,21 @@ func (m *muxProvider) Start() {
m.logger.Info("Getting new connection from connProvider")

var conn net.Conn
conn, err = m.connProvider.NewConnection()
if err != nil {
if m.lifetime.Err() != nil {
return
}
m.muxPermits.Release(1)
m.logger.Info("Couldn't connect to mux TCP destination", tag.Error(err))
continue connect
}

var session *yamux.Session
session, err = m.sessionFn(conn)
if err != nil {
if m.lifetime.Err() != nil {
return
}
m.muxPermits.Release(1)
m.logger.Error("Invalid mux client configuration", tag.Error(err))
continue connect

retryable := func(err error) bool {
return m.lifetime.Err() == nil
}
// Force Yamux to actually send something on the conn to make sure it's alive
_, err = session.Ping()

err = backoff.ThrottleRetry(func() error {
var fnErr error
session, conn, fnErr = m.connectOnce()
return fnErr
}, retryPolicy, retryable)
if err != nil {
if m.lifetime.Err() != nil {
return
} else if errors.Is(err, yamux.ErrConnectionWriteTimeout) {
m.logger.Info("Timed out establishing mux", tag.Error(err),
tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String())))
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "timeout")...).Inc()
} else if err == io.EOF {
m.logger.Info("Remote immediately disconnected. This usually means the listener had queued connections", tag.Error(err),
tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String())))
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "disconnected")...).Inc()
} else {
m.logger.Warn("Unknown error", tag.Error(err), tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String())))
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "unknown")...).Inc()
}
// Make sure session & conn close on error
_ = session.Close()
_ = conn.Close()
// retryable returned false → shutting down
m.muxPermits.Release(1)
continue connect
return
}

m.addNewMux(session, conn)
Expand All @@ -161,6 +134,42 @@ func (m *muxProvider) Start() {
})
}

// connectOnce performs a single end-to-end attempt to establish a mux session:
// acquire a connection from the connProvider, build a yamux session on top of it,
// and verify it's alive with a Ping. Any failure cleans up partially-built state
// and returns the error so the caller (backoff.ThrottleRetry) can retry.
func (m *muxProvider) connectOnce() (*yamux.Session, net.Conn, error) {
conn, err := m.connProvider.NewConnection()
if err != nil {
m.logger.Info("Couldn't connect to mux TCP destination", tag.Error(err))
return nil, nil, err
}
session, err := m.sessionFn(conn)
if err != nil {
m.logger.Error("Invalid mux client configuration", tag.Error(err))
_ = conn.Close()
return nil, nil, err
}
// Force Yamux to actually send something on the conn to make sure it's alive
if _, err = session.Ping(); err != nil {
remoteAddr := tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String()))
if errors.Is(err, yamux.ErrConnectionWriteTimeout) {
m.logger.Info("Timed out establishing mux", tag.Error(err), remoteAddr)
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "timeout")...).Inc()
} else if err == io.EOF {
m.logger.Info("Remote immediately disconnected. This usually means the listener had queued connections", tag.Error(err), remoteAddr)
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "disconnected")...).Inc()
} else {
m.logger.Warn("Unknown error", tag.Error(err), remoteAddr)
metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "unknown")...).Inc()
}
_ = session.Close()
_ = conn.Close()
return nil, nil, err
}
return session, conn, nil
}

func (m *muxProvider) Address() string {
return m.connProvider.Address()
}
Expand Down
Loading