From 2ed17ea6689203300515528adbf841fedc395932 Mon Sep 17 00:00:00 2001 From: Haifeng He Date: Thu, 14 May 2026 10:38:11 -0700 Subject: [PATCH] Add retry policy for provider connection --- transport/mux/provider.go | 87 +++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/transport/mux/provider.go b/transport/mux/provider.go index 1c1704d..292c281 100644 --- a/transport/mux/provider.go +++ b/transport/mux/provider.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "github.com/hashicorp/yamux" + "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/channel" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -99,7 +100,6 @@ func (m *muxProvider) Start() { // Notify all resources are cleaned up m.hasCleanedUp.Shutdown() }() - connect: for { // Wait for shutdown or session needed err = m.muxPermits.Acquire(m.lifetime, 1) @@ -110,48 +110,21 @@ func (m *muxProvider) Start() { m.logger.Info("Getting new connection from connProvider") var conn net.Conn - conn, err = m.connProvider.NewConnection() - if err != nil { - if m.lifetime.Err() != nil { - return - } - m.muxPermits.Release(1) - m.logger.Info("Couldn't connect to mux TCP destination", tag.Error(err)) - continue connect - } - var session *yamux.Session - session, err = m.sessionFn(conn) - if err != nil { - if m.lifetime.Err() != nil { - return - } - m.muxPermits.Release(1) - m.logger.Error("Invalid mux client configuration", tag.Error(err)) - continue connect + + retryable := func(err error) bool { + return m.lifetime.Err() == nil } - // Force Yamux to actually send something on the conn to make sure it's alive - _, err = session.Ping() + + err = backoff.ThrottleRetry(func() error { + var fnErr error + session, conn, fnErr = m.connectOnce() + return fnErr + }, retryPolicy, retryable) if err != nil { - if m.lifetime.Err() != nil { - return - } else if errors.Is(err, yamux.ErrConnectionWriteTimeout) { - m.logger.Info("Timed out establishing mux", tag.Error(err), - tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String()))) - metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "timeout")...).Inc() - } else if err == io.EOF { - m.logger.Info("Remote immediately disconnected. This usually means the listener had queued connections", tag.Error(err), - tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String()))) - metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "disconnected")...).Inc() - } else { - m.logger.Warn("Unknown error", tag.Error(err), tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String()))) - metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "unknown")...).Inc() - } - // Make sure session & conn close on error - _ = session.Close() - _ = conn.Close() + // retryable returned false → shutting down m.muxPermits.Release(1) - continue connect + return } m.addNewMux(session, conn) @@ -161,6 +134,42 @@ func (m *muxProvider) Start() { }) } +// connectOnce performs a single end-to-end attempt to establish a mux session: +// acquire a connection from the connProvider, build a yamux session on top of it, +// and verify it's alive with a Ping. Any failure cleans up partially-built state +// and returns the error so the caller (backoff.ThrottleRetry) can retry. +func (m *muxProvider) connectOnce() (*yamux.Session, net.Conn, error) { + conn, err := m.connProvider.NewConnection() + if err != nil { + m.logger.Info("Couldn't connect to mux TCP destination", tag.Error(err)) + return nil, nil, err + } + session, err := m.sessionFn(conn) + if err != nil { + m.logger.Error("Invalid mux client configuration", tag.Error(err)) + _ = conn.Close() + return nil, nil, err + } + // Force Yamux to actually send something on the conn to make sure it's alive + if _, err = session.Ping(); err != nil { + remoteAddr := tag.NewStringTag("remoteAddr", common.GetHost(session.RemoteAddr().String())) + if errors.Is(err, yamux.ErrConnectionWriteTimeout) { + m.logger.Info("Timed out establishing mux", tag.Error(err), remoteAddr) + metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "timeout")...).Inc() + } else if err == io.EOF { + m.logger.Info("Remote immediately disconnected. This usually means the listener had queued connections", tag.Error(err), remoteAddr) + metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "disconnected")...).Inc() + } else { + m.logger.Warn("Unknown error", tag.Error(err), remoteAddr) + metrics.MuxErrors.WithLabelValues(append(m.metricLabels, "unknown")...).Inc() + } + _ = session.Close() + _ = conn.Close() + return nil, nil, err + } + return session, conn, nil +} + func (m *muxProvider) Address() string { return m.connProvider.Address() }