From 31305cf2c3fae8b020bfb5e2b02db6fd27a320fd Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Thu, 16 Apr 2026 17:12:12 +0200 Subject: [PATCH 1/6] wip --- apps/code/src/main/menu.ts | 24 ++++++++++++ apps/code/src/main/services/auth/service.ts | 43 +++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/apps/code/src/main/menu.ts b/apps/code/src/main/menu.ts index 48e475207..dd1bdbfc4 100644 --- a/apps/code/src/main/menu.ts +++ b/apps/code/src/main/menu.ts @@ -10,6 +10,7 @@ import { } from "electron"; import { container } from "./di/container"; import { MAIN_TOKENS } from "./di/tokens"; +import type { AuthService } from "./services/auth/service"; import type { McpAppsService } from "./services/mcp-apps/service"; import type { UIService } from "./services/ui/service"; import type { UpdatesService } from "./services/updates/service"; @@ -132,6 +133,29 @@ function buildFileMenu(): MenuItemConstructorOptions { .invalidateToken(); }, }, + { + label: "Refresh OAuth token", + click: () => { + container + .get(MAIN_TOKENS.AuthService) + .refreshAccessToken() + .then(() => { + dialog.showMessageBox({ + type: "info", + title: "OAuth Token Refreshed", + message: + "A fresh access token was fetched from PostHog.\nThe background refresh timer has been re-armed.", + }); + }) + .catch((err: Error) => { + dialog.showMessageBox({ + type: "error", + title: "OAuth Token Refresh Failed", + message: err.message, + }); + }); + }, + }, { label: "Refresh MCP Apps discovery", click: () => { diff --git a/apps/code/src/main/services/auth/service.ts b/apps/code/src/main/services/auth/service.ts index 973899889..74a75349a 100644 --- a/apps/code/src/main/services/auth/service.ts +++ b/apps/code/src/main/services/auth/service.ts @@ -31,6 +31,12 @@ import { const log = logger.scope("auth-service"); const TOKEN_EXPIRY_SKEW_MS = 60_000; +// Proactively refresh 30 min before expiry. The lazy skew check in +// ensureValidSession() isn't enough on its own: long-running agent turns can +// hold a token reference for many minutes, and the PostHog MCP server needs a +// valid bearer on *every* turn. A background refresh keeps the in-memory token +// fresh even when nothing else in the app happens to call getValidAccessToken(). +const TOKEN_REFRESH_BUFFER_MS = 30 * 60 * 1000; type FetchLike = ( input: string | Request, init?: RequestInit, @@ -73,6 +79,7 @@ export class AuthService extends TypedEventEmitter { private session: InMemorySession | null = null; private initializePromise: Promise | null = null; private refreshPromise: Promise | null = null; + private refreshTimeoutId: NodeJS.Timeout | null = null; constructor( @inject(MAIN_TOKENS.AuthPreferenceRepository) private readonly authPreferenceRepository: IAuthPreferenceRepository, @@ -221,6 +228,10 @@ export class AuthService extends TypedEventEmitter { async logout(): Promise { const { cloudRegion, projectId } = this.state; + if (this.refreshTimeoutId) { + clearTimeout(this.refreshTimeoutId); + this.refreshTimeoutId = null; + } this.authSessionRepository.clearCurrent(); this.session = null; this.setAnonymousState({ cloudRegion, projectId }); @@ -457,8 +468,36 @@ export class AuthService extends TypedEventEmitter { availableOrgIds: session.availableOrgIds, needsScopeReauth: false, }); + this.scheduleTokenRefresh(); await this.updateCodeAccessFromSession(); } + private scheduleTokenRefresh(): void { + if (this.refreshTimeoutId) { + clearTimeout(this.refreshTimeoutId); + this.refreshTimeoutId = null; + } + + if (!this.session) { + return; + } + + const timeUntilRefresh = + this.session.accessTokenExpiresAt - Date.now() - TOKEN_REFRESH_BUFFER_MS; + + const fire = () => { + this.refreshTimeoutId = null; + this.refreshAccessToken().catch((error) => { + log.warn("Proactive token refresh failed", { error }); + }); + }; + + if (timeUntilRefresh <= 0) { + fire(); + return; + } + + this.refreshTimeoutId = setTimeout(fire, timeUntilRefresh); + } private persistSession(input: { refreshToken: string; cloudRegion: CloudRegion; @@ -598,6 +637,10 @@ export class AuthService extends TypedEventEmitter { } @preDestroy() shutdown(): void { + if (this.refreshTimeoutId) { + clearTimeout(this.refreshTimeoutId); + this.refreshTimeoutId = null; + } this.connectivityUnsubscribe?.(); this.connectivityUnsubscribe = null; powerMonitor.off("resume", this.handleResume); From 4c02095a62564afc84aa38100e261704d5c9fb1f Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Thu, 16 Apr 2026 17:12:17 +0200 Subject: [PATCH 2/6] wip --- apps/code/src/main/menu.ts | 6 ++-- .../src/main/services/agent/service.test.ts | 7 ++++ apps/code/src/main/services/agent/service.ts | 33 +++++++++++++++++++ apps/code/src/main/services/auth/service.ts | 26 +++++++++++++++ packages/agent/src/acp-extensions.ts | 3 ++ .../agent/src/adapters/claude/claude-agent.ts | 24 ++++++++++++-- 6 files changed, 94 insertions(+), 5 deletions(-) diff --git a/apps/code/src/main/menu.ts b/apps/code/src/main/menu.ts index dd1bdbfc4..e4055b98d 100644 --- a/apps/code/src/main/menu.ts +++ b/apps/code/src/main/menu.ts @@ -138,13 +138,13 @@ function buildFileMenu(): MenuItemConstructorOptions { click: () => { container .get(MAIN_TOKENS.AuthService) - .refreshAccessToken() + .scheduleRefresh() .then(() => { dialog.showMessageBox({ type: "info", - title: "OAuth Token Refreshed", + title: "OAuth Token Refresh", message: - "A fresh access token was fetched from PostHog.\nThe background refresh timer has been re-armed.", + "Token refresh scheduled.\nIt will execute when all active agent turns complete.", }); }) .catch((err: Error) => { diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index 02e1642cc..eb2e5697f 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -176,6 +176,12 @@ function createMockDependencies() { notifyToolResult: vi.fn(), notifyToolCancelled: vi.fn(), }, + authService: { + on: vi.fn(), + off: vi.fn(), + setRefreshBlocker: vi.fn(), + flushPendingRefresh: vi.fn().mockResolvedValue(undefined), + }, }; } @@ -201,6 +207,7 @@ describe("AgentService", () => { deps.posthogPluginService as never, deps.agentAuthAdapter as never, deps.mcpAppsService as never, + deps.authService as never, ); }); diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index 5ae20bf0b..433bc9914 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -44,6 +44,8 @@ import { MAIN_TOKENS } from "../../di/tokens"; import { isDevBuild } from "../../utils/env"; import { logger } from "../../utils/logger"; import { TypedEventEmitter } from "../../utils/typed-event-emitter"; +import { AuthServiceEvent } from "../auth/schemas"; +import type { AuthService } from "../auth/service"; import type { FsService } from "../fs/service"; import type { McpAppsService } from "../mcp-apps/service"; import type { PosthogPluginService } from "../posthog-plugin/service"; @@ -322,6 +324,7 @@ export class AgentService extends TypedEventEmitter { private posthogPluginService: PosthogPluginService; private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; + private authService: AuthService; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -336,6 +339,8 @@ export class AgentService extends TypedEventEmitter { agentAuthAdapter: AgentAuthAdapter, @inject(MAIN_TOKENS.McpAppsService) mcpAppsService: McpAppsService, + @inject(MAIN_TOKENS.AuthService) + authService: AuthService, ) { super(); this.processTracking = processTracking; @@ -344,8 +349,16 @@ export class AgentService extends TypedEventEmitter { this.posthogPluginService = posthogPluginService; this.agentAuthAdapter = agentAuthAdapter; this.mcpAppsService = mcpAppsService; + this.authService = authService; powerMonitor.on("resume", () => this.checkIdleDeadlines()); + + this.authService.setRefreshBlocker(() => this.hasActiveSessions()); + this.authService.on(AuthServiceEvent.StateChanged, (state) => { + if (state.status === "authenticated") { + this.refreshMcpForActiveSessions(); + } + }); } /** @@ -884,6 +897,7 @@ When creating pull requests, add the following footer at the end of the PR descr this.sleepService.release(sessionId); if (!this.hasActiveSessions()) { + this.authService.flushPendingRefresh(); this.emit(AgentServiceEvent.SessionsIdle, undefined); } } @@ -1074,6 +1088,25 @@ For git operations while detached: return `Your worktree is back on branch \`${context.branchName}\`. Normal git commands work again.`; } + private async refreshMcpForActiveSessions(): Promise { + for (const [taskRunId, session] of this.sessions) { + try { + const mcpServers = await this.agentAuthAdapter.buildMcpServers( + session.config.credentials, + ); + await session.clientSideConnection.extNotification( + POSTHOG_NOTIFICATIONS.REFRESH_MCP, + { mcpServers }, + ); + } catch (err) { + log.warn("Failed to push MCP refresh", { + taskRunId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + @preDestroy() async cleanupAll(): Promise { for (const { handle } of this.idleTimeouts.values()) clearTimeout(handle); diff --git a/apps/code/src/main/services/auth/service.ts b/apps/code/src/main/services/auth/service.ts index 74a75349a..32ca4e243 100644 --- a/apps/code/src/main/services/auth/service.ts +++ b/apps/code/src/main/services/auth/service.ts @@ -80,6 +80,8 @@ export class AuthService extends TypedEventEmitter { private initializePromise: Promise | null = null; private refreshPromise: Promise | null = null; private refreshTimeoutId: NodeJS.Timeout | null = null; + private isRefreshBlocked: (() => boolean) | null = null; + private pendingRefresh = false; constructor( @inject(MAIN_TOKENS.AuthPreferenceRepository) private readonly authPreferenceRepository: IAuthPreferenceRepository, @@ -137,6 +139,26 @@ export class AuthService extends TypedEventEmitter { apiHost: getCloudUrlFromRegion(session.cloudRegion), }; } + /** Register a callback that returns true when proactive token refreshes should be deferred. */ + setRefreshBlocker(fn: () => boolean): void { + this.isRefreshBlocked = fn; + } + /** Request a token refresh, deferring if a blocker is active. */ + async scheduleRefresh(): Promise { + if (this.isRefreshBlocked?.()) { + this.pendingRefresh = true; + return; + } + await this.refreshAccessToken(); + } + /** Execute a deferred refresh if one is pending. Called by the blocker owner when it becomes unblocked. */ + async flushPendingRefresh(): Promise { + if (!this.pendingRefresh) return; + this.pendingRefresh = false; + await this.refreshAccessToken().catch((error) => { + log.warn("Deferred token refresh failed", { error }); + }); + } async invalidateAccessTokenForTest(): Promise { await this.initialize(); @@ -486,6 +508,10 @@ export class AuthService extends TypedEventEmitter { const fire = () => { this.refreshTimeoutId = null; + if (this.isRefreshBlocked?.()) { + this.pendingRefresh = true; + return; + } this.refreshAccessToken().catch((error) => { log.warn("Proactive token refresh failed", { error }); }); diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 62a2a1083..2e15d76a2 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -63,6 +63,9 @@ export const POSTHOG_NOTIFICATIONS = { /** Token usage update for a session turn */ USAGE_UPDATE: "_posthog/usage_update", + + /** Client requests MCP server reconnection with updated credentials */ + REFRESH_MCP: "_posthog/refresh_mcp", } as const; type NotificationMethod = diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index 54842af00..ce130c26b 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -44,7 +44,7 @@ import { } from "@anthropic-ai/claude-agent-sdk"; import { v7 as uuidv7 } from "uuid"; import packageJson from "../../../package.json" with { type: "json" }; -import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions"; +import { isNotification, POSTHOG_NOTIFICATIONS } from "../../acp-extensions"; import { unreachable, withTimeout } from "../../utils/common"; import { Logger } from "../../utils/logger"; import { Pushable } from "../../utils/streams"; @@ -640,6 +640,26 @@ export class ClaudeAcpAgent extends BaseAcpAgent { await this.session.query.interrupt(); } + async extNotification( + method: string, + params: Record, + ): Promise { + if (isNotification(method, POSTHOG_NOTIFICATIONS.REFRESH_MCP)) { + const mcpServers = parseMcpServers( + params as Pick, + ); + this.logger.info("Refreshing MCP servers", { + serverCount: Object.keys(mcpServers).length, + }); + const result = await this.session.query.setMcpServers(mcpServers); + this.logger.info("MCP servers refreshed", { + added: result.added, + removed: result.removed, + errors: result.errors, + }); + } + } + async unstable_setSessionModel( params: SetSessionModelRequest, ): Promise { @@ -837,7 +857,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { CODE_EXECUTION_MODES.includes(meta.permissionMode as CodeExecutionMode) ? (meta.permissionMode as CodeExecutionMode) : "default"; - + this.logger.info("mcpServers", { mcpServers }); const options = buildSessionOptions({ cwd, mcpServers, From 85ef4944b428cf104ff0aa9628ee64682cff14c9 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Thu, 16 Apr 2026 18:17:03 +0200 Subject: [PATCH 3/6] wip --- apps/code/src/main/services/agent/service.ts | 27 ++++++++++++++++++- apps/code/src/main/services/auth/service.ts | 10 +++---- .../agent/src/adapters/claude/claude-agent.ts | 17 ++++++++++-- 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index 433bc9914..e15e7f941 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -325,6 +325,7 @@ export class AgentService extends TypedEventEmitter { private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; private authService: AuthService; + private mcpRefreshDebounce: NodeJS.Timeout | null = null; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -356,7 +357,7 @@ export class AgentService extends TypedEventEmitter { this.authService.setRefreshBlocker(() => this.hasActiveSessions()); this.authService.on(AuthServiceEvent.StateChanged, (state) => { if (state.status === "authenticated") { - this.refreshMcpForActiveSessions(); + this.scheduleRefreshMcp(); } }); } @@ -1088,12 +1089,32 @@ For git operations while detached: return `Your worktree is back on branch \`${context.branchName}\`. Normal git commands work again.`; } + /** Debounce rapid StateChanged events into a single MCP refresh. */ + private scheduleRefreshMcp(): void { + if (this.mcpRefreshDebounce) { + clearTimeout(this.mcpRefreshDebounce); + } + this.mcpRefreshDebounce = setTimeout(() => { + this.mcpRefreshDebounce = null; + this.refreshMcpForActiveSessions(); + }, 500); + } + private async refreshMcpForActiveSessions(): Promise { for (const [taskRunId, session] of this.sessions) { try { const mcpServers = await this.agentAuthAdapter.buildMcpServers( session.config.credentials, ); + log.info("Refreshing MCP servers for session", { + taskRunId, + serverCount: mcpServers.length, + servers: mcpServers.map((s) => ({ + name: s.name, + url: s.url, + headerCount: s.headers.length, + })), + }); await session.clientSideConnection.extNotification( POSTHOG_NOTIFICATIONS.REFRESH_MCP, { mcpServers }, @@ -1111,6 +1132,10 @@ For git operations while detached: async cleanupAll(): Promise { for (const { handle } of this.idleTimeouts.values()) clearTimeout(handle); this.idleTimeouts.clear(); + if (this.mcpRefreshDebounce) { + clearTimeout(this.mcpRefreshDebounce); + this.mcpRefreshDebounce = null; + } const sessionIds = Array.from(this.sessions.keys()); log.info("Cleaning up all agent sessions", { sessionCount: sessionIds.length, diff --git a/apps/code/src/main/services/auth/service.ts b/apps/code/src/main/services/auth/service.ts index 32ca4e243..fb4d8bc9a 100644 --- a/apps/code/src/main/services/auth/service.ts +++ b/apps/code/src/main/services/auth/service.ts @@ -517,12 +517,10 @@ export class AuthService extends TypedEventEmitter { }); }; - if (timeUntilRefresh <= 0) { - fire(); - return; - } - - this.refreshTimeoutId = setTimeout(fire, timeUntilRefresh); + // Always schedule via setTimeout — never call fire() synchronously. + // A synchronous call during syncAuthenticatedSession() would cascade: + // fire → refreshAccessToken → syncAuthenticatedSession → scheduleTokenRefresh → fire … + this.refreshTimeoutId = setTimeout(fire, Math.max(timeUntilRefresh, 0)); } private persistSession(input: { refreshToken: string; diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index ce130c26b..310614c97 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -857,10 +857,14 @@ export class ClaudeAcpAgent extends BaseAcpAgent { CODE_EXECUTION_MODES.includes(meta.permissionMode as CodeExecutionMode) ? (meta.permissionMode as CodeExecutionMode) : "default"; - this.logger.info("mcpServers", { mcpServers }); + // Route our MCP servers through setMcpServers() below rather than Options.mcpServers. + // Servers passed via Options go into Claude Code's appState.mcp, which setMcpServers() + // does NOT touch — it only reconciles dynamicMcpState. Registering via setMcpServers + // from the start keeps everything in dynamicMcpState so later refresh calls cleanly + // replace the existing connection instead of creating a second one. const options = buildSessionOptions({ cwd, - mcpServers, + mcpServers: {}, permissionMode, canUseTool: this.createCanUseTool(sessionId, meta?.allowedDomains), logger: this.logger, @@ -997,6 +1001,15 @@ export class ClaudeAcpAgent extends BaseAcpAgent { } } + if (Object.keys(mcpServers).length > 0) { + const result = await q.setMcpServers(mcpServers); + this.logger.info("MCP servers registered dynamically", { + sessionId, + added: result.added, + errors: result.errors, + }); + } + const settingsModel = settingsManager.getSettings().model; const metaModel = meta?.model; const resolvedModelId = From 46b6f2463486b98b92a088871638e465f605d192 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Fri, 17 Apr 2026 11:40:46 +0200 Subject: [PATCH 4/6] mcp proxy --- apps/code/src/main/di/container.ts | 2 + apps/code/src/main/di/tokens.ts | 1 + apps/code/src/main/menu.ts | 24 +- .../main/services/agent/auth-adapter.test.ts | 34 ++- .../src/main/services/agent/auth-adapter.ts | 36 ++- apps/code/src/main/services/agent/service.ts | 58 +--- .../src/main/services/mcp-proxy/service.ts | 287 ++++++++++++++++++ .../agent/src/adapters/claude/claude-agent.ts | 38 +-- 8 files changed, 373 insertions(+), 107 deletions(-) create mode 100644 apps/code/src/main/services/mcp-proxy/service.ts diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index f45163c53..ba30b8fa3 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -32,6 +32,7 @@ import { LinearIntegrationService } from "../services/linear-integration/service import { LlmGatewayService } from "../services/llm-gateway/service"; import { McpAppsService } from "../services/mcp-apps/service"; import { McpCallbackService } from "../services/mcp-callback/service"; +import { McpProxyService } from "../services/mcp-proxy/service"; import { NotificationService } from "../services/notification/service"; import { OAuthService } from "../services/oauth/service"; import { PosthogPluginService } from "../services/posthog-plugin/service"; @@ -66,6 +67,7 @@ container.bind(MAIN_TOKENS.AgentAuthAdapter).to(AgentAuthAdapter); container.bind(MAIN_TOKENS.AgentService).to(AgentService); container.bind(MAIN_TOKENS.AuthService).to(AuthService); container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService); +container.bind(MAIN_TOKENS.McpProxyService).to(McpProxyService); container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService); container.bind(MAIN_TOKENS.SuspensionService).to(SuspensionService); container.bind(MAIN_TOKENS.AppLifecycleService).to(AppLifecycleService); diff --git a/apps/code/src/main/di/tokens.ts b/apps/code/src/main/di/tokens.ts index 27bdbcafc..1b980855b 100644 --- a/apps/code/src/main/di/tokens.ts +++ b/apps/code/src/main/di/tokens.ts @@ -23,6 +23,7 @@ export const MAIN_TOKENS = Object.freeze({ AgentService: Symbol.for("Main.AgentService"), AuthService: Symbol.for("Main.AuthService"), AuthProxyService: Symbol.for("Main.AuthProxyService"), + McpProxyService: Symbol.for("Main.McpProxyService"), ArchiveService: Symbol.for("Main.ArchiveService"), SuspensionService: Symbol.for("Main.SuspensionService"), AppLifecycleService: Symbol.for("Main.AppLifecycleService"), diff --git a/apps/code/src/main/menu.ts b/apps/code/src/main/menu.ts index e4055b98d..6805ba501 100644 --- a/apps/code/src/main/menu.ts +++ b/apps/code/src/main/menu.ts @@ -134,7 +134,7 @@ function buildFileMenu(): MenuItemConstructorOptions { }, }, { - label: "Refresh OAuth token", + label: "Schedule refresh of OAuth token", click: () => { container .get(MAIN_TOKENS.AuthService) @@ -156,6 +156,28 @@ function buildFileMenu(): MenuItemConstructorOptions { }); }, }, + { + label: "Force refresh of OAuth token", + click: () => { + container + .get(MAIN_TOKENS.AuthService) + .refreshAccessToken() + .then(() => { + dialog.showMessageBox({ + type: "info", + title: "OAuth Token Refreshed", + message: "Access token refreshed successfully.", + }); + }) + .catch((err: Error) => { + dialog.showMessageBox({ + type: "error", + title: "OAuth Token Refresh Failed", + message: err.message, + }); + }); + }, + }, { label: "Refresh MCP Apps discovery", click: () => { diff --git a/apps/code/src/main/services/agent/auth-adapter.test.ts b/apps/code/src/main/services/agent/auth-adapter.test.ts index acb58fd45..102c2ddb8 100644 --- a/apps/code/src/main/services/agent/auth-adapter.test.ts +++ b/apps/code/src/main/services/agent/auth-adapter.test.ts @@ -50,6 +50,14 @@ function createDependencies() { authProxy: { start: vi.fn().mockResolvedValue("http://127.0.0.1:9999"), }, + mcpProxy: { + start: vi.fn().mockResolvedValue(undefined), + register: vi + .fn() + .mockImplementation( + (id: string) => `http://127.0.0.1:9998/${encodeURIComponent(id)}`, + ), + }, }; } @@ -68,6 +76,7 @@ describe("AgentAuthAdapter", () => { adapter = new AgentAuthAdapter( deps.authService as never, deps.authProxy as never, + deps.mcpProxy as never, ); }); @@ -75,20 +84,21 @@ describe("AgentAuthAdapter", () => { vi.restoreAllMocks(); }); - it("builds the default PostHog MCP server", async () => { + it("builds the default PostHog MCP server routed through the local proxy", async () => { const servers = await adapter.buildMcpServers(baseCredentials); + expect(deps.mcpProxy.register).toHaveBeenCalledWith( + "posthog", + "https://mcp.posthog.com/mcp", + ); expect(servers).toEqual( expect.arrayContaining([ expect.objectContaining({ name: "posthog", type: "http", - url: "https://mcp.posthog.com/mcp", - headers: expect.arrayContaining([ - { - name: "Authorization", - value: "Bearer test-access-token", - }, + url: "http://127.0.0.1:9998/posthog", + headers: expect.not.arrayContaining([ + expect.objectContaining({ name: "Authorization" }), ]), }), ]), @@ -152,14 +162,16 @@ describe("AgentAuthAdapter", () => { const servers = await adapter.buildMcpServers(baseCredentials); + expect(deps.mcpProxy.register).toHaveBeenCalledWith( + "installation-inst-2", + "https://proxy.posthog.com/inst-2/", + ); expect(servers).toEqual( expect.arrayContaining([ expect.objectContaining({ name: "secure-server", - url: "https://proxy.posthog.com/inst-2/", - headers: [ - { name: "Authorization", value: "Bearer test-access-token" }, - ], + url: "http://127.0.0.1:9998/installation-inst-2", + headers: [], }), ]), ); diff --git a/apps/code/src/main/services/agent/auth-adapter.ts b/apps/code/src/main/services/agent/auth-adapter.ts index ad635474a..6fc68682f 100644 --- a/apps/code/src/main/services/agent/auth-adapter.ts +++ b/apps/code/src/main/services/agent/auth-adapter.ts @@ -5,6 +5,7 @@ import { MAIN_TOKENS } from "../../di/tokens"; import { logger } from "../../utils/logger"; import type { AuthService } from "../auth/service"; import type { AuthProxyService } from "../auth-proxy/service"; +import type { McpProxyService } from "../mcp-proxy/service"; import type { Credentials } from "./schemas"; const log = logger.scope("agent-auth-adapter"); @@ -37,6 +38,8 @@ export class AgentAuthAdapter { private readonly authService: AuthService, @inject(MAIN_TOKENS.AuthProxyService) private readonly authProxy: AuthProxyService, + @inject(MAIN_TOKENS.McpProxyService) + private readonly mcpProxy: McpProxyService, ) {} createPosthogConfig(credentials: Credentials): AgentPosthogConfig { @@ -51,14 +54,24 @@ export class AgentAuthAdapter { async buildMcpServers(credentials: Credentials): Promise { const servers: AcpMcpServer[] = []; const mcpUrl = this.getPostHogMcpUrl(credentials.apiHost); - const token = await this.getValidToken(); + // Warm the token so authenticatedFetch() has something cached, but do not + // bake it into the MCP config — the proxy injects a fresh one on every + // forwarded request. + const sessionStartToken = await this.getValidToken(); + log.info("buildMcpServers — session-start token", { + tokenPrefix: sessionStartToken.slice(0, 16), + tokenSuffix: sessionStartToken.slice(-8), + tokenLength: sessionStartToken.length, + }); + + await this.mcpProxy.start(); + const proxiedPosthogUrl = this.mcpProxy.register("posthog", mcpUrl); servers.push({ name: "posthog", type: "http", - url: mcpUrl, + url: proxiedPosthogUrl, headers: [ - { name: "Authorization", value: `Bearer ${token}` }, { name: "x-posthog-project-id", value: String(credentials.projectId), @@ -72,10 +85,12 @@ export class AgentAuthAdapter { for (const installation of installations) { if (installation.url === mcpUrl) continue; + const name = + installation.name || installation.display_name || installation.url; + if (installation.auth_type === "none") { servers.push({ - name: - installation.name || installation.display_name || installation.url, + name, type: "http", url: installation.url, headers: [], @@ -83,12 +98,15 @@ export class AgentAuthAdapter { continue; } + const proxiedUrl = this.mcpProxy.register( + `installation-${installation.id}`, + installation.proxy_url, + ); servers.push({ - name: - installation.name || installation.display_name || installation.url, + name, type: "http", - url: installation.proxy_url, - headers: [{ name: "Authorization", value: `Bearer ${token}` }], + url: proxiedUrl, + headers: [], }); } diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index e15e7f941..313d68eaf 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -44,7 +44,6 @@ import { MAIN_TOKENS } from "../../di/tokens"; import { isDevBuild } from "../../utils/env"; import { logger } from "../../utils/logger"; import { TypedEventEmitter } from "../../utils/typed-event-emitter"; -import { AuthServiceEvent } from "../auth/schemas"; import type { AuthService } from "../auth/service"; import type { FsService } from "../fs/service"; import type { McpAppsService } from "../mcp-apps/service"; @@ -325,7 +324,6 @@ export class AgentService extends TypedEventEmitter { private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; private authService: AuthService; - private mcpRefreshDebounce: NodeJS.Timeout | null = null; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -355,11 +353,6 @@ export class AgentService extends TypedEventEmitter { powerMonitor.on("resume", () => this.checkIdleDeadlines()); this.authService.setRefreshBlocker(() => this.hasActiveSessions()); - this.authService.on(AuthServiceEvent.StateChanged, (state) => { - if (state.status === "authenticated") { - this.scheduleRefreshMcp(); - } - }); } /** @@ -1089,53 +1082,10 @@ For git operations while detached: return `Your worktree is back on branch \`${context.branchName}\`. Normal git commands work again.`; } - /** Debounce rapid StateChanged events into a single MCP refresh. */ - private scheduleRefreshMcp(): void { - if (this.mcpRefreshDebounce) { - clearTimeout(this.mcpRefreshDebounce); - } - this.mcpRefreshDebounce = setTimeout(() => { - this.mcpRefreshDebounce = null; - this.refreshMcpForActiveSessions(); - }, 500); - } - - private async refreshMcpForActiveSessions(): Promise { - for (const [taskRunId, session] of this.sessions) { - try { - const mcpServers = await this.agentAuthAdapter.buildMcpServers( - session.config.credentials, - ); - log.info("Refreshing MCP servers for session", { - taskRunId, - serverCount: mcpServers.length, - servers: mcpServers.map((s) => ({ - name: s.name, - url: s.url, - headerCount: s.headers.length, - })), - }); - await session.clientSideConnection.extNotification( - POSTHOG_NOTIFICATIONS.REFRESH_MCP, - { mcpServers }, - ); - } catch (err) { - log.warn("Failed to push MCP refresh", { - taskRunId, - error: err instanceof Error ? err.message : String(err), - }); - } - } - } - @preDestroy() async cleanupAll(): Promise { for (const { handle } of this.idleTimeouts.values()) clearTimeout(handle); this.idleTimeouts.clear(); - if (this.mcpRefreshDebounce) { - clearTimeout(this.mcpRefreshDebounce); - this.mcpRefreshDebounce = null; - } const sessionIds = Array.from(this.sessions.keys()); log.info("Cleaning up all agent sessions", { sessionCount: sessionIds.length, @@ -1399,6 +1349,14 @@ For git operations while detached: update.status === "failed" ) { session?.inFlightMcpToolCalls.delete(update.toolCallId); + if (update.status === "failed") { + log.warn("MCP tool failed — raw update", { + toolName, + toolCallId: update.toolCallId, + rawOutput: JSON.stringify(update.rawOutput), + content: JSON.stringify(update.content), + }); + } service.mcpAppsService.notifyToolResult( toolName, update.toolCallId, diff --git a/apps/code/src/main/services/mcp-proxy/service.ts b/apps/code/src/main/services/mcp-proxy/service.ts new file mode 100644 index 000000000..99309a2b7 --- /dev/null +++ b/apps/code/src/main/services/mcp-proxy/service.ts @@ -0,0 +1,287 @@ +import http from "node:http"; +import { inject, injectable } from "inversify"; +import { MAIN_TOKENS } from "../../di/tokens"; +import { logger } from "../../utils/logger"; +import type { AuthService } from "../auth/service"; + +const log = logger.scope("mcp-proxy"); + +/** + * Local HTTP proxy for MCP servers. Allows routing MCP requests through a + * stable loopback URL while injecting a fresh access token on every forwarded + * request. MCP transports bake their headers at construction time, so without + * this proxy we would either need to tear the transport down on every token + * rotation (expensive, racy) or leave it serving stale tokens. + */ +@injectable() +export class McpProxyService { + private server: http.Server | null = null; + private port: number | null = null; + private targets = new Map(); + + constructor( + @inject(MAIN_TOKENS.AuthService) + private readonly authService: AuthService, + ) {} + + async start(): Promise { + if (this.server) return; + + this.server = http.createServer((req, res) => { + this.handleRequest(req, res); + }); + + return new Promise((resolve, reject) => { + this.server?.listen(0, "127.0.0.1", () => { + const addr = this.server?.address(); + if (typeof addr === "object" && addr) { + this.port = addr.port; + log.info("MCP proxy started", { port: this.port }); + resolve(); + } else { + reject(new Error("Failed to get proxy address")); + } + }); + + this.server?.on("error", (err) => { + log.error("MCP proxy server error", err); + reject(err); + }); + }); + } + + /** + * Register a target URL under a stable ID. Returns the loopback URL that + * should be passed to the MCP transport. Subsequent registrations with the + * same ID overwrite the target. + */ + register(id: string, targetUrl: string): string { + if (!this.port) { + throw new Error("MCP proxy not started"); + } + this.targets.set(id, targetUrl); + return `http://127.0.0.1:${this.port}/${encodeURIComponent(id)}`; + } + + async stop(): Promise { + if (!this.server) return; + return new Promise((resolve) => { + this.server?.close(() => { + log.info("MCP proxy stopped"); + this.server = null; + this.port = null; + this.targets.clear(); + resolve(); + }); + }); + } + + private handleRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + ): void { + const incoming = req.url ?? "/"; + const [, rawId, ...rest] = incoming.split("/"); + const id = rawId ? decodeURIComponent(rawId) : ""; + const target = this.targets.get(id); + + if (!target) { + log.warn("Unknown MCP proxy target", { id, url: incoming }); + res.writeHead(404); + res.end("Unknown target"); + return; + } + + const suffix = rest.join("/"); + const targetUrl = suffix ? `${target}/${suffix}` : target; + + const strippedAuthHeaders = new Set([ + "authorization", + "proxy-authorization", + ]); + const headers: Record = {}; + for (const [key, value] of Object.entries(req.headers)) { + if ( + key === "host" || + key === "connection" || + strippedAuthHeaders.has(key) + ) { + continue; + } + if (typeof value === "string") { + headers[key] = value; + } + } + + const fetchOptions: RequestInit = { + method: req.method ?? "GET", + headers, + }; + + if (req.method !== "GET" && req.method !== "HEAD") { + const chunks: Buffer[] = []; + req.on("data", (chunk: Buffer) => chunks.push(chunk)); + req.on("end", () => { + fetchOptions.body = Buffer.concat(chunks); + this.forwardRequest(id, targetUrl, fetchOptions, res); + }); + } else { + this.forwardRequest(id, targetUrl, fetchOptions, res); + } + } + + private async forwardRequest( + id: string, + url: string, + options: RequestInit, + res: http.ServerResponse, + ): Promise { + try { + const preToken = await this.authService.getValidAccessToken(); + log.info("MCP proxy BEFORE request", { + id, + url, + tokenPrefix: preToken.accessToken.slice(0, 16), + tokenSuffix: preToken.accessToken.slice(-8), + tokenLength: preToken.accessToken.length, + }); + + let response = await this.authService.authenticatedFetch( + fetch, + url, + options, + ); + + const postToken = await this.authService.getValidAccessToken(); + log.info("MCP proxy AFTER request", { + id, + url, + tokenPrefix: postToken.accessToken.slice(0, 16), + tokenSuffix: postToken.accessToken.slice(-8), + tokenLength: postToken.accessToken.length, + tokenChangedDuringRequest: + preToken.accessToken !== postToken.accessToken, + }); + + // MCP servers return HTTP 200 with auth failures encoded in the JSON-RPC + // body, so authenticatedFetch's 401/403 retry never kicks in. Detect the + // known error shape and retry once with a force-refreshed token. + const contentType = response.headers.get("content-type") ?? ""; + const isSse = contentType.includes("text/event-stream"); + + if (!isSse) { + const buf = Buffer.from(await response.arrayBuffer()); + const bodyText = buf.toString("utf8"); + + if (this.isAuthErrorBody(bodyText)) { + log.warn("MCP auth error in body — refreshing token and retrying", { + id, + url, + }); + await this.authService.refreshAccessToken(); + response = await this.authService.authenticatedFetch( + fetch, + url, + options, + ); + const retryContentType = response.headers.get("content-type") ?? ""; + if (!retryContentType.includes("text/event-stream")) { + const retryBuf = Buffer.from(await response.arrayBuffer()); + this.writeBufferedResponse(response, retryBuf, res); + return; + } + // Fall through to streaming path below for SSE retry responses. + this.writeStreamingResponse(response, res); + return; + } + + if (/"isError"\s*:\s*true/.test(bodyText) || response.status >= 400) { + log.warn("MCP proxy non-OK body", { + id, + url, + status: response.status, + body: bodyText.slice(0, 2000), + }); + } else { + log.debug("MCP proxy response", { + id, + url, + status: response.status, + }); + } + + this.writeBufferedResponse(response, buf, res); + return; + } + + log.debug("MCP proxy response", { + id, + url, + status: response.status, + streaming: true, + }); + this.writeStreamingResponse(response, res); + } catch (err) { + log.error("MCP proxy forward error", { id, url, err }); + if (!res.headersSent) { + res.writeHead(502); + } + res.end("Proxy error"); + } + } + + private isAuthErrorBody(bodyText: string): boolean { + return ( + bodyText.includes('"authentication_failed"') || + bodyText.includes('"authentication_error"') + ); + } + + private buildResponseHeaders(response: Response): Record { + const stripHeaders = new Set([ + "transfer-encoding", + "content-encoding", + "content-length", + ]); + const headers: Record = {}; + response.headers.forEach((value: string, key: string) => { + if (stripHeaders.has(key)) return; + headers[key] = value; + }); + return headers; + } + + private writeBufferedResponse( + response: Response, + buf: Buffer, + res: http.ServerResponse, + ): void { + res.writeHead(response.status, this.buildResponseHeaders(response)); + res.end(buf); + } + + private async writeStreamingResponse( + response: Response, + res: http.ServerResponse, + ): Promise { + res.writeHead(response.status, this.buildResponseHeaders(response)); + if (!response.body) { + res.end(); + return; + } + const reader = response.body.getReader(); + const pump = async (): Promise => { + const { done, value } = await reader.read(); + if (done) { + res.end(); + return; + } + const canContinue = res.write(value); + if (canContinue) { + return pump(); + } + res.once("drain", () => pump()); + }; + await pump(); + } +} diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index 310614c97..fb6fcf48a 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -44,7 +44,7 @@ import { } from "@anthropic-ai/claude-agent-sdk"; import { v7 as uuidv7 } from "uuid"; import packageJson from "../../../package.json" with { type: "json" }; -import { isNotification, POSTHOG_NOTIFICATIONS } from "../../acp-extensions"; +import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions"; import { unreachable, withTimeout } from "../../utils/common"; import { Logger } from "../../utils/logger"; import { Pushable } from "../../utils/streams"; @@ -640,26 +640,6 @@ export class ClaudeAcpAgent extends BaseAcpAgent { await this.session.query.interrupt(); } - async extNotification( - method: string, - params: Record, - ): Promise { - if (isNotification(method, POSTHOG_NOTIFICATIONS.REFRESH_MCP)) { - const mcpServers = parseMcpServers( - params as Pick, - ); - this.logger.info("Refreshing MCP servers", { - serverCount: Object.keys(mcpServers).length, - }); - const result = await this.session.query.setMcpServers(mcpServers); - this.logger.info("MCP servers refreshed", { - added: result.added, - removed: result.removed, - errors: result.errors, - }); - } - } - async unstable_setSessionModel( params: SetSessionModelRequest, ): Promise { @@ -857,14 +837,9 @@ export class ClaudeAcpAgent extends BaseAcpAgent { CODE_EXECUTION_MODES.includes(meta.permissionMode as CodeExecutionMode) ? (meta.permissionMode as CodeExecutionMode) : "default"; - // Route our MCP servers through setMcpServers() below rather than Options.mcpServers. - // Servers passed via Options go into Claude Code's appState.mcp, which setMcpServers() - // does NOT touch — it only reconciles dynamicMcpState. Registering via setMcpServers - // from the start keeps everything in dynamicMcpState so later refresh calls cleanly - // replace the existing connection instead of creating a second one. const options = buildSessionOptions({ cwd, - mcpServers: {}, + mcpServers, permissionMode, canUseTool: this.createCanUseTool(sessionId, meta?.allowedDomains), logger: this.logger, @@ -1001,15 +976,6 @@ export class ClaudeAcpAgent extends BaseAcpAgent { } } - if (Object.keys(mcpServers).length > 0) { - const result = await q.setMcpServers(mcpServers); - this.logger.info("MCP servers registered dynamically", { - sessionId, - added: result.added, - errors: result.errors, - }); - } - const settingsModel = settingsManager.getSettings().model; const metaModel = meta?.model; const resolvedModelId = From a8e48140282abc868fc770e61d94d2697d9e6762 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Fri, 17 Apr 2026 17:06:14 +0200 Subject: [PATCH 5/6] remove scheduled background token refresh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No longer needed — the MCP proxy fetches a fresh token per request via authenticatedFetch, so proactive refresh ahead of expiry doesn't add value. Drop the timer, blocker/pending mechanism, the "Schedule refresh" menu item, and the session-idle flush wiring. Also strip two debug logs left from the investigation. --- apps/code/src/main/menu.ts | 23 ------- .../src/main/services/agent/auth-adapter.ts | 7 +- .../src/main/services/agent/service.test.ts | 7 -- apps/code/src/main/services/agent/service.ts | 16 ----- apps/code/src/main/services/auth/service.ts | 67 ------------------- 5 files changed, 1 insertion(+), 119 deletions(-) diff --git a/apps/code/src/main/menu.ts b/apps/code/src/main/menu.ts index 6805ba501..ba6418676 100644 --- a/apps/code/src/main/menu.ts +++ b/apps/code/src/main/menu.ts @@ -133,29 +133,6 @@ function buildFileMenu(): MenuItemConstructorOptions { .invalidateToken(); }, }, - { - label: "Schedule refresh of OAuth token", - click: () => { - container - .get(MAIN_TOKENS.AuthService) - .scheduleRefresh() - .then(() => { - dialog.showMessageBox({ - type: "info", - title: "OAuth Token Refresh", - message: - "Token refresh scheduled.\nIt will execute when all active agent turns complete.", - }); - }) - .catch((err: Error) => { - dialog.showMessageBox({ - type: "error", - title: "OAuth Token Refresh Failed", - message: err.message, - }); - }); - }, - }, { label: "Force refresh of OAuth token", click: () => { diff --git a/apps/code/src/main/services/agent/auth-adapter.ts b/apps/code/src/main/services/agent/auth-adapter.ts index 6fc68682f..9136cd485 100644 --- a/apps/code/src/main/services/agent/auth-adapter.ts +++ b/apps/code/src/main/services/agent/auth-adapter.ts @@ -57,12 +57,7 @@ export class AgentAuthAdapter { // Warm the token so authenticatedFetch() has something cached, but do not // bake it into the MCP config — the proxy injects a fresh one on every // forwarded request. - const sessionStartToken = await this.getValidToken(); - log.info("buildMcpServers — session-start token", { - tokenPrefix: sessionStartToken.slice(0, 16), - tokenSuffix: sessionStartToken.slice(-8), - tokenLength: sessionStartToken.length, - }); + await this.getValidToken(); await this.mcpProxy.start(); const proxiedPosthogUrl = this.mcpProxy.register("posthog", mcpUrl); diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index eb2e5697f..02e1642cc 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -176,12 +176,6 @@ function createMockDependencies() { notifyToolResult: vi.fn(), notifyToolCancelled: vi.fn(), }, - authService: { - on: vi.fn(), - off: vi.fn(), - setRefreshBlocker: vi.fn(), - flushPendingRefresh: vi.fn().mockResolvedValue(undefined), - }, }; } @@ -207,7 +201,6 @@ describe("AgentService", () => { deps.posthogPluginService as never, deps.agentAuthAdapter as never, deps.mcpAppsService as never, - deps.authService as never, ); }); diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index 313d68eaf..5ae20bf0b 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -44,7 +44,6 @@ import { MAIN_TOKENS } from "../../di/tokens"; import { isDevBuild } from "../../utils/env"; import { logger } from "../../utils/logger"; import { TypedEventEmitter } from "../../utils/typed-event-emitter"; -import type { AuthService } from "../auth/service"; import type { FsService } from "../fs/service"; import type { McpAppsService } from "../mcp-apps/service"; import type { PosthogPluginService } from "../posthog-plugin/service"; @@ -323,7 +322,6 @@ export class AgentService extends TypedEventEmitter { private posthogPluginService: PosthogPluginService; private agentAuthAdapter: AgentAuthAdapter; private mcpAppsService: McpAppsService; - private authService: AuthService; constructor( @inject(MAIN_TOKENS.ProcessTrackingService) @@ -338,8 +336,6 @@ export class AgentService extends TypedEventEmitter { agentAuthAdapter: AgentAuthAdapter, @inject(MAIN_TOKENS.McpAppsService) mcpAppsService: McpAppsService, - @inject(MAIN_TOKENS.AuthService) - authService: AuthService, ) { super(); this.processTracking = processTracking; @@ -348,11 +344,8 @@ export class AgentService extends TypedEventEmitter { this.posthogPluginService = posthogPluginService; this.agentAuthAdapter = agentAuthAdapter; this.mcpAppsService = mcpAppsService; - this.authService = authService; powerMonitor.on("resume", () => this.checkIdleDeadlines()); - - this.authService.setRefreshBlocker(() => this.hasActiveSessions()); } /** @@ -891,7 +884,6 @@ When creating pull requests, add the following footer at the end of the PR descr this.sleepService.release(sessionId); if (!this.hasActiveSessions()) { - this.authService.flushPendingRefresh(); this.emit(AgentServiceEvent.SessionsIdle, undefined); } } @@ -1349,14 +1341,6 @@ For git operations while detached: update.status === "failed" ) { session?.inFlightMcpToolCalls.delete(update.toolCallId); - if (update.status === "failed") { - log.warn("MCP tool failed — raw update", { - toolName, - toolCallId: update.toolCallId, - rawOutput: JSON.stringify(update.rawOutput), - content: JSON.stringify(update.content), - }); - } service.mcpAppsService.notifyToolResult( toolName, update.toolCallId, diff --git a/apps/code/src/main/services/auth/service.ts b/apps/code/src/main/services/auth/service.ts index fb4d8bc9a..973899889 100644 --- a/apps/code/src/main/services/auth/service.ts +++ b/apps/code/src/main/services/auth/service.ts @@ -31,12 +31,6 @@ import { const log = logger.scope("auth-service"); const TOKEN_EXPIRY_SKEW_MS = 60_000; -// Proactively refresh 30 min before expiry. The lazy skew check in -// ensureValidSession() isn't enough on its own: long-running agent turns can -// hold a token reference for many minutes, and the PostHog MCP server needs a -// valid bearer on *every* turn. A background refresh keeps the in-memory token -// fresh even when nothing else in the app happens to call getValidAccessToken(). -const TOKEN_REFRESH_BUFFER_MS = 30 * 60 * 1000; type FetchLike = ( input: string | Request, init?: RequestInit, @@ -79,9 +73,6 @@ export class AuthService extends TypedEventEmitter { private session: InMemorySession | null = null; private initializePromise: Promise | null = null; private refreshPromise: Promise | null = null; - private refreshTimeoutId: NodeJS.Timeout | null = null; - private isRefreshBlocked: (() => boolean) | null = null; - private pendingRefresh = false; constructor( @inject(MAIN_TOKENS.AuthPreferenceRepository) private readonly authPreferenceRepository: IAuthPreferenceRepository, @@ -139,26 +130,6 @@ export class AuthService extends TypedEventEmitter { apiHost: getCloudUrlFromRegion(session.cloudRegion), }; } - /** Register a callback that returns true when proactive token refreshes should be deferred. */ - setRefreshBlocker(fn: () => boolean): void { - this.isRefreshBlocked = fn; - } - /** Request a token refresh, deferring if a blocker is active. */ - async scheduleRefresh(): Promise { - if (this.isRefreshBlocked?.()) { - this.pendingRefresh = true; - return; - } - await this.refreshAccessToken(); - } - /** Execute a deferred refresh if one is pending. Called by the blocker owner when it becomes unblocked. */ - async flushPendingRefresh(): Promise { - if (!this.pendingRefresh) return; - this.pendingRefresh = false; - await this.refreshAccessToken().catch((error) => { - log.warn("Deferred token refresh failed", { error }); - }); - } async invalidateAccessTokenForTest(): Promise { await this.initialize(); @@ -250,10 +221,6 @@ export class AuthService extends TypedEventEmitter { async logout(): Promise { const { cloudRegion, projectId } = this.state; - if (this.refreshTimeoutId) { - clearTimeout(this.refreshTimeoutId); - this.refreshTimeoutId = null; - } this.authSessionRepository.clearCurrent(); this.session = null; this.setAnonymousState({ cloudRegion, projectId }); @@ -490,38 +457,8 @@ export class AuthService extends TypedEventEmitter { availableOrgIds: session.availableOrgIds, needsScopeReauth: false, }); - this.scheduleTokenRefresh(); await this.updateCodeAccessFromSession(); } - private scheduleTokenRefresh(): void { - if (this.refreshTimeoutId) { - clearTimeout(this.refreshTimeoutId); - this.refreshTimeoutId = null; - } - - if (!this.session) { - return; - } - - const timeUntilRefresh = - this.session.accessTokenExpiresAt - Date.now() - TOKEN_REFRESH_BUFFER_MS; - - const fire = () => { - this.refreshTimeoutId = null; - if (this.isRefreshBlocked?.()) { - this.pendingRefresh = true; - return; - } - this.refreshAccessToken().catch((error) => { - log.warn("Proactive token refresh failed", { error }); - }); - }; - - // Always schedule via setTimeout — never call fire() synchronously. - // A synchronous call during syncAuthenticatedSession() would cascade: - // fire → refreshAccessToken → syncAuthenticatedSession → scheduleTokenRefresh → fire … - this.refreshTimeoutId = setTimeout(fire, Math.max(timeUntilRefresh, 0)); - } private persistSession(input: { refreshToken: string; cloudRegion: CloudRegion; @@ -661,10 +598,6 @@ export class AuthService extends TypedEventEmitter { } @preDestroy() shutdown(): void { - if (this.refreshTimeoutId) { - clearTimeout(this.refreshTimeoutId); - this.refreshTimeoutId = null; - } this.connectivityUnsubscribe?.(); this.connectivityUnsubscribe = null; powerMonitor.off("resume", this.handleResume); From 2fdb4d7211710c18f9ca6d644a3f966fc4fdd917 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Fri, 17 Apr 2026 17:31:15 +0200 Subject: [PATCH 6/6] fix review comments --- .../main/services/mcp-proxy/service.test.ts | 269 ++++++++++++++++++ .../src/main/services/mcp-proxy/service.ts | 76 ++--- packages/agent/src/acp-extensions.ts | 3 - .../agent/src/adapters/claude/claude-agent.ts | 1 + 4 files changed, 309 insertions(+), 40 deletions(-) create mode 100644 apps/code/src/main/services/mcp-proxy/service.test.ts diff --git a/apps/code/src/main/services/mcp-proxy/service.test.ts b/apps/code/src/main/services/mcp-proxy/service.test.ts new file mode 100644 index 000000000..290aaf202 --- /dev/null +++ b/apps/code/src/main/services/mcp-proxy/service.test.ts @@ -0,0 +1,269 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { AuthService } from "../auth/service"; +import { McpProxyService } from "./service"; + +vi.mock("../../utils/logger.js", () => ({ + logger: { + scope: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), + }, +})); + +type AuthServiceMock = { + authenticatedFetch: ReturnType; + refreshAccessToken: ReturnType; + getValidAccessToken: ReturnType; +}; + +function createAuthServiceMock(): AuthServiceMock { + return { + authenticatedFetch: vi.fn(), + refreshAccessToken: vi.fn().mockResolvedValue({ + accessToken: "refreshed-token", + apiHost: "https://app.posthog.com", + }), + getValidAccessToken: vi.fn().mockResolvedValue({ + accessToken: "access-token", + apiHost: "https://app.posthog.com", + }), + }; +} + +describe("McpProxyService", () => { + let authServiceMock: AuthServiceMock; + let service: McpProxyService; + + beforeEach(() => { + authServiceMock = createAuthServiceMock(); + service = new McpProxyService(authServiceMock as unknown as AuthService); + }); + + afterEach(async () => { + await service.stop(); + vi.restoreAllMocks(); + }); + + describe("lifecycle", () => { + it("starts on a loopback port and returns a URL for register()", async () => { + await service.start(); + const url = service.register("alpha", "https://upstream.example/path"); + expect(url).toMatch(/^http:\/\/127\.0\.0\.1:\d+\/alpha$/); + }); + + it("throws from register() before start()", () => { + expect(() => + service.register("alpha", "https://upstream.example"), + ).toThrowError(/not started/); + }); + + it("handles concurrent start() calls without races", async () => { + await Promise.all([service.start(), service.start(), service.start()]); + const url = service.register("alpha", "https://upstream.example"); + expect(url).toMatch(/^http:\/\/127\.0\.0\.1:\d+\/alpha$/); + }); + + it("stop() closes the server and clears registered targets", async () => { + await service.start(); + service.register("alpha", "https://upstream.example"); + await service.stop(); + expect(() => + service.register("alpha", "https://upstream.example"), + ).toThrowError(/not started/); + }); + }); + + describe("request forwarding", () => { + it("returns 404 for unknown targets", async () => { + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + const unknownUrl = proxyUrl.replace("/alpha", "/bravo"); + + const res = await fetch(unknownUrl); + + expect(res.status).toBe(404); + expect(await res.text()).toBe("Unknown target"); + expect(authServiceMock.authenticatedFetch).not.toHaveBeenCalled(); + }); + + it("forwards GET requests and returns the upstream body and status", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response('{"ok":true}', { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + const res = await fetch(proxyUrl); + + expect(res.status).toBe(200); + expect(await res.text()).toBe('{"ok":true}'); + expect(authServiceMock.authenticatedFetch).toHaveBeenCalledTimes(1); + const [, url] = authServiceMock.authenticatedFetch.mock.calls[0]; + expect(url).toBe("https://upstream.example"); + }); + + it("forwards POST body bytes to the upstream URL", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response('{"ok":true}', { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + await fetch(proxyUrl, { + method: "POST", + headers: { "content-type": "application/json" }, + body: '{"hello":"world"}', + }); + + expect(authServiceMock.authenticatedFetch).toHaveBeenCalledTimes(1); + const [, , options] = authServiceMock.authenticatedFetch.mock.calls[0]; + expect(options.method).toBe("POST"); + expect(Buffer.from(options.body).toString("utf8")).toBe( + '{"hello":"world"}', + ); + }); + + it("strips Authorization and Host headers before forwarding", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response('{"ok":true}', { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + await fetch(proxyUrl, { + headers: { + Authorization: "Bearer leaked", + "X-Custom": "keep-me", + }, + }); + + const [, , options] = authServiceMock.authenticatedFetch.mock.calls[0]; + const forwardedHeaderKeys = Object.keys(options.headers).map((k) => + k.toLowerCase(), + ); + expect(forwardedHeaderKeys).not.toContain("authorization"); + expect(forwardedHeaderKeys).not.toContain("host"); + expect(forwardedHeaderKeys).not.toContain("connection"); + expect(options.headers["x-custom"]).toBe("keep-me"); + }); + + it("joins path suffix without producing a double slash for trailing-slash targets", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response("{}", { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + service.register("alpha", "https://upstream.example/inst-2/"); + const port = new URL( + service.register("alpha", "https://upstream.example/inst-2/"), + ).port; + + await fetch(`http://127.0.0.1:${port}/alpha/tools/list`); + + const [, url] = + authServiceMock.authenticatedFetch.mock.calls.at(-1) ?? []; + expect(url).toBe("https://upstream.example/inst-2/tools/list"); + }); + + it("preserves the incoming query string on the upstream URL", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response("{}", { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + await fetch(`${proxyUrl}?token=abc&foo=bar`); + + const [, url] = authServiceMock.authenticatedFetch.mock.calls[0]; + expect(url).toBe("https://upstream.example?token=abc&foo=bar"); + }); + }); + + describe("auth error retry", () => { + it("refreshes the token and retries once when the body contains authentication_failed", async () => { + authServiceMock.authenticatedFetch + .mockResolvedValueOnce( + new Response( + JSON.stringify({ error: { code: "authentication_failed" } }), + { status: 200, headers: { "content-type": "application/json" } }, + ), + ) + .mockResolvedValueOnce( + new Response('{"ok":true}', { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + const res = await fetch(proxyUrl, { method: "POST", body: "payload" }); + + expect(res.status).toBe(200); + expect(await res.text()).toBe('{"ok":true}'); + expect(authServiceMock.refreshAccessToken).toHaveBeenCalledTimes(1); + expect(authServiceMock.authenticatedFetch).toHaveBeenCalledTimes(2); + }); + + it("does not retry when the body looks healthy", async () => { + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response('{"ok":true}', { + status: 200, + headers: { "content-type": "application/json" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + await fetch(proxyUrl); + + expect(authServiceMock.refreshAccessToken).not.toHaveBeenCalled(); + expect(authServiceMock.authenticatedFetch).toHaveBeenCalledTimes(1); + }); + }); + + describe("SSE streaming", () => { + it("streams event-stream responses through to the client", async () => { + const sseBody = "data: one\n\ndata: two\n\n"; + authServiceMock.authenticatedFetch.mockResolvedValue( + new Response(sseBody, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }), + ); + + await service.start(); + const proxyUrl = service.register("alpha", "https://upstream.example"); + + const res = await fetch(proxyUrl); + + expect(res.headers.get("content-type")).toContain("text/event-stream"); + expect(await res.text()).toBe(sseBody); + expect(authServiceMock.refreshAccessToken).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/code/src/main/services/mcp-proxy/service.ts b/apps/code/src/main/services/mcp-proxy/service.ts index 99309a2b7..5f6b5d832 100644 --- a/apps/code/src/main/services/mcp-proxy/service.ts +++ b/apps/code/src/main/services/mcp-proxy/service.ts @@ -1,5 +1,5 @@ import http from "node:http"; -import { inject, injectable } from "inversify"; +import { inject, injectable, preDestroy } from "inversify"; import { MAIN_TOKENS } from "../../di/tokens"; import { logger } from "../../utils/logger"; import type { AuthService } from "../auth/service"; @@ -12,11 +12,16 @@ const log = logger.scope("mcp-proxy"); * request. MCP transports bake their headers at construction time, so without * this proxy we would either need to tear the transport down on every token * rotation (expensive, racy) or leave it serving stale tokens. + * + * The proxy only listens on 127.0.0.1 and strips inbound Authorization headers + * before forwarding, but any local process can still use it to issue requests + * on the user's behalf — acceptable for a single-user desktop app. */ @injectable() export class McpProxyService { private server: http.Server | null = null; private port: number | null = null; + private startPromise: Promise | null = null; private targets = new Map(); constructor( @@ -25,15 +30,24 @@ export class McpProxyService { ) {} async start(): Promise { - if (this.server) return; + if (this.server && this.port) return; + if (this.startPromise) return this.startPromise; + this.startPromise = this.doStart().catch((err) => { + this.startPromise = null; + throw err; + }); + return this.startPromise; + } - this.server = http.createServer((req, res) => { + private async doStart(): Promise { + const server = http.createServer((req, res) => { this.handleRequest(req, res); }); + this.server = server; - return new Promise((resolve, reject) => { - this.server?.listen(0, "127.0.0.1", () => { - const addr = this.server?.address(); + await new Promise((resolve, reject) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); if (typeof addr === "object" && addr) { this.port = addr.port; log.info("MCP proxy started", { port: this.port }); @@ -43,7 +57,7 @@ export class McpProxyService { } }); - this.server?.on("error", (err) => { + server.on("error", (err) => { log.error("MCP proxy server error", err); reject(err); }); @@ -63,37 +77,43 @@ export class McpProxyService { return `http://127.0.0.1:${this.port}/${encodeURIComponent(id)}`; } + @preDestroy() async stop(): Promise { if (!this.server) return; - return new Promise((resolve) => { - this.server?.close(() => { + const server = this.server; + await new Promise((resolve) => { + server.close(() => { log.info("MCP proxy stopped"); - this.server = null; - this.port = null; - this.targets.clear(); resolve(); }); }); + this.server = null; + this.port = null; + this.startPromise = null; + this.targets.clear(); } private handleRequest( req: http.IncomingMessage, res: http.ServerResponse, ): void { - const incoming = req.url ?? "/"; - const [, rawId, ...rest] = incoming.split("/"); + const incoming = new URL(req.url ?? "/", "http://placeholder"); + const segments = incoming.pathname.split("/").filter(Boolean); + const [rawId, ...rest] = segments; const id = rawId ? decodeURIComponent(rawId) : ""; const target = this.targets.get(id); if (!target) { - log.warn("Unknown MCP proxy target", { id, url: incoming }); + log.warn("Unknown MCP proxy target", { id, url: req.url }); res.writeHead(404); res.end("Unknown target"); return; } const suffix = rest.join("/"); - const targetUrl = suffix ? `${target}/${suffix}` : target; + const targetBase = target.replace(/\/+$/, ""); + const targetUrl = + (suffix ? `${targetBase}/${suffix}` : targetBase) + incoming.search; const strippedAuthHeaders = new Set([ "authorization", @@ -137,32 +157,12 @@ export class McpProxyService { res: http.ServerResponse, ): Promise { try { - const preToken = await this.authService.getValidAccessToken(); - log.info("MCP proxy BEFORE request", { - id, - url, - tokenPrefix: preToken.accessToken.slice(0, 16), - tokenSuffix: preToken.accessToken.slice(-8), - tokenLength: preToken.accessToken.length, - }); - let response = await this.authService.authenticatedFetch( fetch, url, options, ); - const postToken = await this.authService.getValidAccessToken(); - log.info("MCP proxy AFTER request", { - id, - url, - tokenPrefix: postToken.accessToken.slice(0, 16), - tokenSuffix: postToken.accessToken.slice(-8), - tokenLength: postToken.accessToken.length, - tokenChangedDuringRequest: - preToken.accessToken !== postToken.accessToken, - }); - // MCP servers return HTTP 200 with auth failures encoded in the JSON-RPC // body, so authenticatedFetch's 401/403 retry never kicks in. Detect the // known error shape and retry once with a force-refreshed token. @@ -190,7 +190,6 @@ export class McpProxyService { this.writeBufferedResponse(response, retryBuf, res); return; } - // Fall through to streaming path below for SSE retry responses. this.writeStreamingResponse(response, res); return; } @@ -270,6 +269,9 @@ export class McpProxyService { return; } const reader = response.body.getReader(); + res.on("close", () => { + void reader.cancel().catch(() => {}); + }); const pump = async (): Promise => { const { done, value } = await reader.read(); if (done) { diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 2e15d76a2..62a2a1083 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -63,9 +63,6 @@ export const POSTHOG_NOTIFICATIONS = { /** Token usage update for a session turn */ USAGE_UPDATE: "_posthog/usage_update", - - /** Client requests MCP server reconnection with updated credentials */ - REFRESH_MCP: "_posthog/refresh_mcp", } as const; type NotificationMethod = diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index fb6fcf48a..54842af00 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -837,6 +837,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { CODE_EXECUTION_MODES.includes(meta.permissionMode as CodeExecutionMode) ? (meta.permissionMode as CodeExecutionMode) : "default"; + const options = buildSessionOptions({ cwd, mcpServers,