Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions cmd/altinity-mcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,15 @@ func (a *application) startHTTPServer(cfg config.Config, mcpServer *mcp.Server)

tokenInjector := a.createTokenInjector()
dtInjector := a.dynamicToolsInjector
// Stateless: true makes the streamable HTTP transport carry no per-pod
// session state — each request stands alone. Required for replicas>=2
// behind a non-sticky LB, where consecutive tool calls from one client
// may land on different pods. Trade-off: server-initiated requests
// (sampling, roots/list, etc.) are not supported; altinity-mcp only
// uses client-initiated tool calls so this is safe.
httpServer := mcp.NewStreamableHTTPHandler(func(r *http.Request) *mcp.Server {
return mcpServer
}, nil)
}, &mcp.StreamableHTTPOptions{Stateless: true})

mux := http.NewServeMux()
transportHandler := serverInjector(tokenInjector(dtInjector(httpServer)))
Expand Down Expand Up @@ -484,9 +490,15 @@ func (a *application) startHTTPServer(cfg config.Config, mcpServer *mcp.Server)
httpHandler = stripTrailingSlash(corsHandler(mux))
} else {
// Use standard HTTP server without dynamic paths
// Stateless: true makes the streamable HTTP transport carry no per-pod
// session state — each request stands alone. Required for replicas>=2
// behind a non-sticky LB, where consecutive tool calls from one client
// may land on different pods. Trade-off: server-initiated requests
// (sampling, roots/list, etc.) are not supported; altinity-mcp only
// uses client-initiated tool calls so this is safe.
httpServer := mcp.NewStreamableHTTPHandler(func(r *http.Request) *mcp.Server {
return mcpServer
}, nil)
}, &mcp.StreamableHTTPOptions{Stateless: true})
dtInjector := a.dynamicToolsInjector
mux := http.NewServeMux()
transportHandler := serverInjector(dtInjector(httpServer))
Expand Down Expand Up @@ -964,8 +976,6 @@ type application struct {
mcpServer *altinitymcp.ClickHouseJWEServer
httpSrv *http.Server
httpSrvMutex sync.RWMutex
oauthState *oauthStateStore
oauthStateMu sync.Mutex
configFile string
configMutex sync.RWMutex
stopConfigReload chan struct{}
Expand All @@ -985,15 +995,6 @@ func (a *application) getHTTPServer() *http.Server {
return a.httpSrv
}

func (a *application) getOAuthStateStore() *oauthStateStore {
a.oauthStateMu.Lock()
defer a.oauthStateMu.Unlock()
if a.oauthState == nil {
a.oauthState = newOAuthStateStore()
}
return a.oauthState
}

func newApplication(ctx context.Context, cfg config.Config, cmd CommandInterface) (*application, error) {
if err := validateOAuthRuntimeConfig(cfg); err != nil {
return nil, err
Expand Down Expand Up @@ -1052,7 +1053,6 @@ func newApplication(ctx context.Context, cfg config.Config, cmd CommandInterface
app := &application{
config: cfg,
mcpServer: mcpServer,
oauthState: newOAuthStateStore(),
configFile: cmd.String("config"),
stopConfigReload: make(chan struct{}),
}
Expand Down
33 changes: 0 additions & 33 deletions cmd/altinity-mcp/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3908,39 +3908,6 @@ func TestHealthHandler_CHUnavailable(t *testing.T) {
require.Equal(t, "unhealthy", body["status"])
}

func TestOAuthStateStoreEviction(t *testing.T) {
t.Parallel()
store := newOAuthStateStore()

// Fill pending auth to capacity
for i := 0; i < maxOAuthStateEntries; i++ {
store.putPendingAuth(fmt.Sprintf("pending-%d", i), oauthPendingAuth{
ExpiresAt: time.Now().Add(time.Duration(i) * time.Second),
})
}

// Adding one more should evict the oldest
store.putPendingAuth("new-pending", oauthPendingAuth{ExpiresAt: time.Now().Add(time.Hour)})
_, ok := store.consumePendingAuth("pending-0") // oldest should be evicted
require.False(t, ok)
_, ok = store.consumePendingAuth("new-pending")
require.True(t, ok)

// Fill auth codes to capacity
for i := 0; i < maxOAuthStateEntries; i++ {
store.putAuthCode(fmt.Sprintf("code-%d", i), oauthIssuedCode{
ExpiresAt: time.Now().Add(time.Duration(i) * time.Second),
})
}

// Adding one more should evict the oldest
store.putAuthCode("new-code", oauthIssuedCode{ExpiresAt: time.Now().Add(time.Hour)})
_, ok = store.consumeAuthCode("code-0")
require.False(t, ok)
_, ok = store.consumeAuthCode("new-code")
require.True(t, ok)
}

func TestToolInputSettingsCLIFlag(t *testing.T) {
cases := []struct {
name string
Expand Down
Loading