Skip to content

Commit 57eafa7

Browse files
committed
fix(dev): use globalThis for singleton state to prevent HMR memory leaks
1 parent cd66774 commit 57eafa7

6 files changed

Lines changed: 134 additions & 81 deletions

File tree

apps/sim/lib/copilot/persistence/tool-confirm/index.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,24 @@ import {
99
import { getAsyncToolCalls } from '@/lib/copilot/async-runs/repository'
1010
import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1'
1111
import { getRedisClient } from '@/lib/core/config/redis'
12-
import { createPubSubChannel } from '@/lib/events/pubsub'
12+
import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub'
1313

1414
const logger = createLogger('CopilotOrchestratorPersistence')
1515
const TOOL_CONFIRMATION_TTL_SECONDS = 60 * 10
1616
const toolConfirmationKey = (toolCallId: string) => `copilot:tool-confirmation:${toolCallId}`
1717

18-
const toolConfirmationChannel = createPubSubChannel<AsyncCompletionEnvelope>({
19-
channel: 'copilot:tool-confirmation',
20-
label: 'CopilotToolConfirmation',
21-
})
18+
type ToolConfirmGlobal = typeof globalThis & {
19+
_toolConfirmationChannel?: PubSubChannel<AsyncCompletionEnvelope>
20+
}
21+
22+
const _g = globalThis as ToolConfirmGlobal
23+
if (!_g._toolConfirmationChannel) {
24+
_g._toolConfirmationChannel = createPubSubChannel<AsyncCompletionEnvelope>({
25+
channel: 'copilot:tool-confirmation',
26+
label: 'CopilotToolConfirmation',
27+
})
28+
}
29+
const toolConfirmationChannel = _g._toolConfirmationChannel
2230

2331
/**
2432
* Get a tool call confirmation state from the durable async tool row.

apps/sim/lib/copilot/tasks.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* Channel: `task:status_changed`
88
*/
99

10-
import { createPubSubChannel } from '@/lib/events/pubsub'
10+
import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub'
1111

1212
interface TaskStatusEvent {
1313
workspaceId: string
@@ -16,10 +16,20 @@ interface TaskStatusEvent {
1616
streamId?: string
1717
}
1818

19-
const channel =
20-
typeof window !== 'undefined'
21-
? null
22-
: createPubSubChannel<TaskStatusEvent>({ channel: 'task:status_changed', label: 'task' })
19+
type TaskPubSubGlobal = typeof globalThis & {
20+
_taskStatusChannel?: PubSubChannel<TaskStatusEvent> | null
21+
}
22+
23+
const g = globalThis as TaskPubSubGlobal
24+
25+
if (!('_taskStatusChannel' in g)) {
26+
g._taskStatusChannel =
27+
typeof window !== 'undefined'
28+
? null
29+
: createPubSubChannel<TaskStatusEvent>({ channel: 'task:status_changed', label: 'task' })
30+
}
31+
32+
const channel = g._taskStatusChannel
2333

2434
export const taskPubSub = channel
2535
? {

apps/sim/lib/core/config/redis.ts

Lines changed: 59 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -54,55 +54,65 @@ export function getRedisConnectionDefaults(
5454
}
5555
}
5656

57-
let globalRedisClient: Redis | null = null
58-
let pingFailures = 0
59-
let pingInterval: NodeJS.Timeout | null = null
60-
let pingInFlight = false
57+
interface RedisState {
58+
client: Redis | null
59+
pingFailures: number
60+
pingInterval: NodeJS.Timeout | null
61+
pingInFlight: boolean
62+
reconnectListeners: Array<() => void>
63+
}
64+
65+
const g = globalThis as typeof globalThis & { _redisState?: RedisState }
66+
if (!g._redisState) {
67+
g._redisState = {
68+
client: null,
69+
pingFailures: 0,
70+
pingInterval: null,
71+
pingInFlight: false,
72+
reconnectListeners: [],
73+
}
74+
}
75+
const state = g._redisState
6176

6277
const PING_INTERVAL_MS = 15_000
6378
const MAX_PING_FAILURES = 2
6479

65-
/** Callbacks invoked when the PING health check forces a reconnect. */
66-
const reconnectListeners: Array<() => void> = []
67-
6880
/**
6981
* Register a callback that fires when the PING health check forces a reconnect.
7082
* Useful for resetting cached adapters that hold a stale Redis reference.
7183
*/
7284
export function onRedisReconnect(cb: () => void): void {
73-
reconnectListeners.push(cb)
85+
state.reconnectListeners.push(cb)
7486
}
7587

7688
function startPingHealthCheck(redis: Redis): void {
77-
if (pingInterval) return
89+
if (state.pingInterval) return
7890

79-
pingInterval = setInterval(async () => {
80-
if (pingInFlight) return
81-
pingInFlight = true
91+
state.pingInterval = setInterval(async () => {
92+
if (state.pingInFlight) return
93+
state.pingInFlight = true
8294
try {
8395
await redis.ping()
84-
pingFailures = 0
96+
state.pingFailures = 0
8597
} catch (error) {
86-
pingFailures++
98+
state.pingFailures++
8799
logger.warn('Redis PING failed', {
88-
consecutiveFailures: pingFailures,
100+
consecutiveFailures: state.pingFailures,
89101
error: toError(error).message,
90102
})
91103

92-
if (pingFailures >= MAX_PING_FAILURES) {
104+
if (state.pingFailures >= MAX_PING_FAILURES) {
93105
logger.error('Redis PING failed consecutive times — forcing reconnect', {
94-
consecutiveFailures: pingFailures,
106+
consecutiveFailures: state.pingFailures,
95107
})
96-
pingFailures = 0
97-
// Drop the cached client and stop this health check before disconnecting,
98-
// so the next getRedisClient() builds a fresh client and a fresh PING loop.
99-
// Listeners may call getRedisClient() and must observe the cleared global.
100-
globalRedisClient = null
101-
if (pingInterval) {
102-
clearInterval(pingInterval)
103-
pingInterval = null
108+
state.pingFailures = 0
109+
// Clear before notifying listeners — they may call getRedisClient() and must see the reset state.
110+
state.client = null
111+
if (state.pingInterval) {
112+
clearInterval(state.pingInterval)
113+
state.pingInterval = null
104114
}
105-
for (const cb of reconnectListeners) {
115+
for (const cb of state.reconnectListeners) {
106116
try {
107117
cb()
108118
} catch (cbError) {
@@ -116,7 +126,7 @@ function startPingHealthCheck(redis: Redis): void {
116126
}
117127
}
118128
} finally {
119-
pingInFlight = false
129+
state.pingInFlight = false
120130
}
121131
}, PING_INTERVAL_MS)
122132
}
@@ -131,15 +141,15 @@ function startPingHealthCheck(redis: Redis): void {
131141
export function getRedisClient(): Redis | null {
132142
if (typeof window !== 'undefined') return null
133143
if (!redisUrl) return null
134-
if (globalRedisClient) return globalRedisClient
144+
if (state.client) return state.client
135145

136146
// Outside the try/catch so config errors aren't silently swallowed.
137147
const defaults = getRedisConnectionDefaults(redisUrl)
138148

139149
try {
140150
logger.info('Initializing Redis client')
141151

142-
globalRedisClient = new Redis(redisUrl, {
152+
state.client = new Redis(redisUrl, {
143153
...defaults,
144154
commandTimeout: 5000,
145155
maxRetriesPerRequest: 5,
@@ -162,17 +172,17 @@ export function getRedisClient(): Redis | null {
162172
},
163173
})
164174

165-
globalRedisClient.on('connect', () => logger.info('Redis connected'))
166-
globalRedisClient.on('ready', () => logger.info('Redis ready'))
167-
globalRedisClient.on('error', (err: Error) => {
175+
state.client.on('connect', () => logger.info('Redis connected'))
176+
state.client.on('ready', () => logger.info('Redis ready'))
177+
state.client.on('error', (err: Error) => {
168178
logger.error('Redis error', { error: err.message, code: (err as any).code })
169179
})
170-
globalRedisClient.on('close', () => logger.warn('Redis connection closed'))
171-
globalRedisClient.on('end', () => logger.error('Redis connection ended'))
180+
state.client.on('close', () => logger.warn('Redis connection closed'))
181+
state.client.on('end', () => logger.error('Redis connection ended'))
172182

173-
startPingHealthCheck(globalRedisClient)
183+
startPingHealthCheck(state.client)
174184

175-
return globalRedisClient
185+
return state.client
176186
} catch (error) {
177187
logger.error('Failed to initialize Redis client', { error })
178188
return null
@@ -274,18 +284,18 @@ export async function extendLock(
274284
* Use for graceful shutdown.
275285
*/
276286
export async function closeRedisConnection(): Promise<void> {
277-
if (pingInterval) {
278-
clearInterval(pingInterval)
279-
pingInterval = null
287+
if (state.pingInterval) {
288+
clearInterval(state.pingInterval)
289+
state.pingInterval = null
280290
}
281291

282-
if (globalRedisClient) {
292+
if (state.client) {
283293
try {
284-
await globalRedisClient.quit()
294+
await state.client.quit()
285295
} catch (error) {
286296
logger.error('Error closing Redis connection', { error })
287297
} finally {
288-
globalRedisClient = null
298+
state.client = null
289299
}
290300
}
291301
}
@@ -294,12 +304,12 @@ export async function closeRedisConnection(): Promise<void> {
294304
* Reset all module-level state. Only intended for use in tests.
295305
*/
296306
export function resetForTesting(): void {
297-
if (pingInterval) {
298-
clearInterval(pingInterval)
299-
pingInterval = null
307+
if (state.pingInterval) {
308+
clearInterval(state.pingInterval)
309+
state.pingInterval = null
300310
}
301-
globalRedisClient = null
302-
pingFailures = 0
303-
pingInFlight = false
304-
reconnectListeners.length = 0
311+
state.client = null
312+
state.pingFailures = 0
313+
state.pingInFlight = false
314+
state.reconnectListeners.length = 0
305315
}

apps/sim/lib/execution/cancellation.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ export type ExecutionCancellationRecordResult =
1919
reason: 'redis_unavailable' | 'redis_write_failed'
2020
}
2121

22-
let sharedChannel: PubSubChannel<ExecutionCancelEvent> | null = null
22+
type CancellationGlobal = typeof globalThis & {
23+
_executionCancelChannel?: PubSubChannel<ExecutionCancelEvent>
24+
}
25+
26+
const _g = globalThis as CancellationGlobal
2327

2428
export function getCancellationChannel(): PubSubChannel<ExecutionCancelEvent> {
25-
if (!sharedChannel) {
26-
sharedChannel = createPubSubChannel<ExecutionCancelEvent>({
29+
if (!_g._executionCancelChannel) {
30+
_g._executionCancelChannel = createPubSubChannel<ExecutionCancelEvent>({
2731
channel: EXECUTION_CANCEL_CHANNEL,
2832
label: 'execution-cancel',
2933
})
3034
}
31-
return sharedChannel
35+
return _g._executionCancelChannel
3236
}
3337

3438
export function isRedisCancellationEnabled(): boolean {

apps/sim/lib/mcp/connection-manager.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,13 @@ export class McpConnectionManager {
461461
}
462462
}
463463

464-
export const mcpConnectionManager: McpConnectionManager | null = isTest
465-
? null
466-
: new McpConnectionManager()
464+
type McpManagerGlobal = typeof globalThis & {
465+
_mcpConnectionManager?: McpConnectionManager | null
466+
}
467+
468+
const _g = globalThis as McpManagerGlobal
469+
if (!('_mcpConnectionManager' in _g)) {
470+
_g._mcpConnectionManager = isTest ? null : new McpConnectionManager()
471+
}
472+
473+
export const mcpConnectionManager: McpConnectionManager | null = _g._mcpConnectionManager

apps/sim/lib/mcp/pubsub.ts

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* (published by serve route, consumed by serve route on other processes to push to local SSE clients)
1212
*/
1313

14-
import { createPubSubChannel } from '@/lib/events/pubsub'
14+
import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub'
1515
import type { ToolsChangedEvent, WorkflowToolsChangedEvent } from '@/lib/mcp/types'
1616

1717
interface McpPubSubAdapter {
@@ -22,21 +22,35 @@ interface McpPubSubAdapter {
2222
dispose(): void
2323
}
2424

25-
const toolsChannel =
26-
typeof window !== 'undefined'
27-
? null
28-
: createPubSubChannel<ToolsChangedEvent>({
29-
channel: 'mcp:tools_changed',
30-
label: 'mcp-tools',
31-
})
25+
type McpPubSubGlobal = typeof globalThis & {
26+
_mcpToolsChannel?: PubSubChannel<ToolsChangedEvent> | null
27+
_mcpWorkflowToolsChannel?: PubSubChannel<WorkflowToolsChangedEvent> | null
28+
}
3229

33-
const workflowToolsChannel =
34-
typeof window !== 'undefined'
35-
? null
36-
: createPubSubChannel<WorkflowToolsChangedEvent>({
37-
channel: 'mcp:workflow_tools_changed',
38-
label: 'mcp-workflow-tools',
39-
})
30+
const g = globalThis as McpPubSubGlobal
31+
32+
if (!('_mcpToolsChannel' in g)) {
33+
g._mcpToolsChannel =
34+
typeof window !== 'undefined'
35+
? null
36+
: createPubSubChannel<ToolsChangedEvent>({
37+
channel: 'mcp:tools_changed',
38+
label: 'mcp-tools',
39+
})
40+
}
41+
42+
if (!('_mcpWorkflowToolsChannel' in g)) {
43+
g._mcpWorkflowToolsChannel =
44+
typeof window !== 'undefined'
45+
? null
46+
: createPubSubChannel<WorkflowToolsChangedEvent>({
47+
channel: 'mcp:workflow_tools_changed',
48+
label: 'mcp-workflow-tools',
49+
})
50+
}
51+
52+
const toolsChannel = g._mcpToolsChannel
53+
const workflowToolsChannel = g._mcpWorkflowToolsChannel
4054

4155
export const mcpPubSub: McpPubSubAdapter | null =
4256
typeof window !== 'undefined' || !toolsChannel || !workflowToolsChannel

0 commit comments

Comments
 (0)