Skip to content

Commit 7d83ba2

Browse files
committed
improvement(mcp): post-merge hardening — protocol negotiation + distributed OAuth lock + typed error dispatch
- Inbound MCP server now negotiates protocolVersion per MCP 2025-06-18: echoes the client's requested version when supported, falls back to our latest. Previously hardcoded to the oldest spec version (2024-11-05). - withMcpOauthRefreshLock now takes a Postgres advisory transaction lock in addition to the in-process Promise chain, so concurrent processes (multi-task ECS) serialize on a per-OAuth-row basis. Previously a refresh race across processes could rotate a token under another process's feet and force re-auth. - categorizeError dispatches on McpOauthAuthorizationRequiredError / UnauthorizedError / McpConnectionError first, only falling back to substring matching for SDK / third-party errors. Adds 502 for connection failures and 503 for cooldown. Tests cover all four typed cases. - discoverTools no longer pretends to handle deferred-side-effect rejections via a dead allSettled().catch() — each side-effect already self-logs; we just swallow per-promise to silence unhandled-rejection warnings.
1 parent 1631b36 commit 7d83ba2

6 files changed

Lines changed: 133 additions & 19 deletions

File tree

apps/sim/app/api/mcp/serve/[serverId]/route.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,49 @@ describe('MCP Serve Route', () => {
197197
expect(headers['X-API-Key']).toBeUndefined()
198198
expect(mockGenerateInternalToken).toHaveBeenCalledWith('user-1')
199199
})
200+
201+
describe('initialize protocol version negotiation', () => {
202+
async function callInitialize(protocolVersion?: string) {
203+
dbChainMockFns.limit.mockResolvedValueOnce([
204+
{
205+
id: 'server-1',
206+
name: 'Public Server',
207+
workspaceId: 'ws-1',
208+
isPublic: true,
209+
createdBy: 'owner-1',
210+
},
211+
])
212+
const params: Record<string, unknown> = {
213+
capabilities: {},
214+
clientInfo: { name: 'test', version: '1.0.0' },
215+
}
216+
if (protocolVersion !== undefined) params.protocolVersion = protocolVersion
217+
const req = new NextRequest('http://localhost:3000/api/mcp/serve/server-1', {
218+
method: 'POST',
219+
body: JSON.stringify({ jsonrpc: '2.0', id: 1, method: 'initialize', params }),
220+
})
221+
const res = await POST(req, { params: Promise.resolve({ serverId: 'server-1' }) })
222+
return res.json() as Promise<{ result: { protocolVersion: string } }>
223+
}
224+
225+
it('echoes a supported client protocolVersion (2025-06-18)', async () => {
226+
const body = await callInitialize('2025-06-18')
227+
expect(body.result.protocolVersion).toBe('2025-06-18')
228+
})
229+
230+
it('echoes a supported client protocolVersion (2024-11-05)', async () => {
231+
const body = await callInitialize('2024-11-05')
232+
expect(body.result.protocolVersion).toBe('2024-11-05')
233+
})
234+
235+
it('falls back to latest when client requests unknown version', async () => {
236+
const body = await callInitialize('2099-01-01')
237+
expect(body.result.protocolVersion).toBe('2025-06-18')
238+
})
239+
240+
it('falls back to latest when client omits protocolVersion', async () => {
241+
const body = await callInitialize(undefined)
242+
expect(body.result.protocolVersion).toBe('2025-06-18')
243+
})
244+
})
200245
})

apps/sim/app/api/mcp/serve/[serverId]/route.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@ import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
3636

3737
const logger = createLogger('WorkflowMcpServeAPI')
3838

39+
// Newest first. We echo the client's version when we support it (per
40+
// MCP 2025-06-18 lifecycle spec); otherwise fall back to our latest.
41+
const SUPPORTED_PROTOCOL_VERSIONS = ['2025-06-18', '2025-03-26', '2024-11-05'] as const
42+
const LATEST_PROTOCOL_VERSION = SUPPORTED_PROTOCOL_VERSIONS[0]
43+
44+
function negotiateProtocolVersion(rpcParams: unknown): string {
45+
const requested =
46+
rpcParams && typeof rpcParams === 'object' && 'protocolVersion' in rpcParams
47+
? (rpcParams as { protocolVersion?: unknown }).protocolVersion
48+
: undefined
49+
if (typeof requested === 'string' && SUPPORTED_PROTOCOL_VERSIONS.includes(requested as never)) {
50+
return requested
51+
}
52+
return LATEST_PROTOCOL_VERSION
53+
}
54+
3955
export const dynamic = 'force-dynamic'
4056

4157
interface RouteParams {
@@ -214,7 +230,7 @@ export const POST = withRouteHandler(
214230
switch (method) {
215231
case 'initialize': {
216232
const result: InitializeResult = {
217-
protocolVersion: '2024-11-05',
233+
protocolVersion: negotiateProtocolVersion(rpcParams),
218234
capabilities: { tools: {} },
219235
serverInfo: { name: server.name, version: '1.0.0' },
220236
}

apps/sim/lib/mcp/oauth/storage.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { mcpServerOauth } from '@sim/db/schema'
88
import { createLogger } from '@sim/logger'
99
import { toError } from '@sim/utils/errors'
1010
import { generateId } from '@sim/utils/id'
11-
import { and, eq, gt } from 'drizzle-orm'
11+
import { and, eq, gt, sql } from 'drizzle-orm'
1212
import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption'
1313

1414
const logger = createLogger('McpOauthStorage')
@@ -227,19 +227,31 @@ export async function clearState(rowId: string): Promise<void> {
227227
}
228228

229229
/**
230-
* Per-process serialization for an OAuth row. Refresh tokens rotate (RFC 6749 §6,
231-
* MCP §2.3.3), so two concurrent refreshes against the same row would race and one
232-
* would receive `invalid_grant`, wiping the credentials. We serialize SDK calls
233-
* that may trigger a refresh on a per-row basis.
230+
* Serialize OAuth row access across all callers, in-process AND across
231+
* processes. Refresh tokens rotate (RFC 6749 §6, MCP §2.3.3), so two concurrent
232+
* refreshes against the same row would race and one would receive
233+
* `invalid_grant`, wiping credentials.
234+
*
235+
* Two-tier locking:
236+
* 1) In-process Promise chain — cheap, avoids DB roundtrips when the same
237+
* Node process holds concurrent callers.
238+
* 2) Postgres advisory transaction lock — blocks across processes; released
239+
* automatically when the transaction ends.
234240
*/
235241
const refreshLocks = new Map<string, Promise<unknown>>()
236242

237243
export async function withMcpOauthRefreshLock<T>(rowId: string, fn: () => Promise<T>): Promise<T> {
238244
const prev = refreshLocks.get(rowId) ?? Promise.resolve()
239-
// Wait for the predecessor to settle (success or failure), discard its
240-
// value/error, then run fn. Each caller awaits its own fn's outcome — errors
241-
// do not propagate across callers in the chain.
242-
const next = prev.catch(() => undefined).then(() => fn())
245+
const next = prev
246+
.catch(() => undefined)
247+
.then(() =>
248+
db.transaction(async (tx) => {
249+
await tx.execute(
250+
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`mcp_oauth_refresh:${rowId}`}, 0))`
251+
)
252+
return fn()
253+
})
254+
)
243255
refreshLocks.set(rowId, next)
244256
const cleanup = () => {
245257
if (refreshLocks.get(rowId) === next) refreshLocks.delete(rowId)

apps/sim/lib/mcp/service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,9 +601,9 @@ class McpService {
601601

602602
// Await cache writes so a follow-up discoverTools sees consistent state.
603603
await Promise.allSettled(cacheWrites)
604-
Promise.allSettled(deferredSideEffects).catch((err) => {
605-
logger.error(`[${requestId}] Error in deferred discovery work:`, err)
606-
})
604+
// Each deferred side-effect self-logs failures, so we just mark the
605+
// promises as handled to avoid unhandled-rejection warnings.
606+
for (const p of deferredSideEffects) p.catch(() => {})
607607

608608
if (mcpConnectionManager) {
609609
for (const conn of liveConnections) {

apps/sim/lib/mcp/utils.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
12
import { describe, expect, it } from 'vitest'
23
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
4+
import { McpConnectionError, McpOauthAuthorizationRequiredError } from '@/lib/mcp/types'
35
import {
46
categorizeError,
57
createMcpToolId,
@@ -304,6 +306,32 @@ describe('categorizeError', () => {
304306
expect(result.message).toBe('Unknown error occurred')
305307
})
306308

309+
it.concurrent('returns 401 for McpOauthAuthorizationRequiredError via instanceof', () => {
310+
const error = new McpOauthAuthorizationRequiredError('mcp-a', 'A')
311+
const result = categorizeError(error)
312+
expect(result.status).toBe(401)
313+
expect(result.message).toBe('Authentication required')
314+
})
315+
316+
it.concurrent('returns 401 for SDK UnauthorizedError via instanceof', () => {
317+
const error = new UnauthorizedError('token expired')
318+
const result = categorizeError(error)
319+
expect(result.status).toBe(401)
320+
})
321+
322+
it.concurrent('returns 503 for McpConnectionError with cooldown message', () => {
323+
const error = new McpConnectionError('Server in cooldown — try again shortly.', 'mcp-a')
324+
const result = categorizeError(error)
325+
expect(result.status).toBe(503)
326+
})
327+
328+
it.concurrent('returns 502 for other McpConnectionError', () => {
329+
const error = new McpConnectionError('connect ECONNREFUSED', 'mcp-a')
330+
const result = categorizeError(error)
331+
expect(result.status).toBe(502)
332+
expect(result.message).toBe('Connection failed')
333+
})
334+
307335
it.concurrent('returns 500 for null', () => {
308336
const result = categorizeError(null)
309337
expect(result.status).toBe(500)

apps/sim/lib/mcp/utils.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
12
import { NextResponse } from 'next/server'
23
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
3-
import type { McpApiResponse } from '@/lib/mcp/types'
4+
import {
5+
type McpApiResponse,
6+
McpConnectionError,
7+
McpOauthAuthorizationRequiredError,
8+
} from '@/lib/mcp/types'
49
import { isMcpTool, MCP } from '@/executor/constants'
510

611
export const MCP_CONSTANTS = {
@@ -137,28 +142,36 @@ export function categorizeError(error: unknown): { message: string; status: numb
137142
return { message: 'Unknown error occurred', status: 500 }
138143
}
139144

145+
// Typed dispatch first — our own classes carry definitive intent.
146+
if (error instanceof McpOauthAuthorizationRequiredError || error instanceof UnauthorizedError) {
147+
return { message: 'Authentication required', status: 401 }
148+
}
149+
if (error instanceof McpConnectionError) {
150+
if (error.message.toLowerCase().includes('cooldown')) {
151+
return { message: 'Server temporarily unavailable', status: 503 }
152+
}
153+
return { message: 'Connection failed', status: 502 }
154+
}
155+
156+
// Fall back to substring matching for SDK / third-party errors we don't
157+
// own a typed class for.
140158
const msg = error.message.toLowerCase()
141159

142160
if (msg.includes('timeout')) {
143161
return { message: 'Request timed out', status: 408 }
144162
}
145-
146163
if (msg.includes('cooldown')) {
147164
return { message: 'Server temporarily unavailable', status: 503 }
148165
}
149-
150166
if (msg.includes('not found') || msg.includes('not accessible')) {
151167
return { message: 'Resource not found', status: 404 }
152168
}
153-
154169
if (msg.includes('authentication') || msg.includes('unauthorized')) {
155170
return { message: 'Authentication required', status: 401 }
156171
}
157-
158172
if (msg.includes('invalid') || msg.includes('missing required') || msg.includes('validation')) {
159173
return { message: 'Invalid request parameters', status: 400 }
160174
}
161-
162175
return { message: 'Internal server error', status: 500 }
163176
}
164177

0 commit comments

Comments
 (0)