Skip to content

Commit ee9890e

Browse files
committed
Mship block stream
1 parent c403faf commit ee9890e

3 files changed

Lines changed: 679 additions & 56 deletions

File tree

apps/sim/app/api/mothership/execute/route.ts

Lines changed: 192 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ import { parseRequest } from '@/lib/api/server'
77
import { checkInternalAuth } from '@/lib/auth/hybrid'
88
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
99
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
10+
import {
11+
MothershipStreamV1EventType,
12+
MothershipStreamV1TextChannel,
13+
} from '@/lib/copilot/generated/mothership-stream-v1'
1014
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
1115
import { requestExplicitStreamAbort } from '@/lib/copilot/request/session/explicit-abort'
16+
import type { StreamEvent } from '@/lib/copilot/request/types'
1217
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1318
import { buildMothershipToolsForRequest } from '@/lib/mothership/settings/runtime'
1419
import {
@@ -19,17 +24,59 @@ import {
1924
export const maxDuration = 3600
2025

2126
const logger = createLogger('MothershipExecuteAPI')
27+
const MOTHERSHIP_EXECUTE_STREAM_HEADER = 'x-mothership-execute-stream'
28+
const MOTHERSHIP_EXECUTE_STREAM_VALUE = 'ndjson'
29+
const MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE = 'application/x-ndjson'
30+
const MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS = 15_000
2231

2332
function isAbortError(error: unknown): boolean {
2433
return error instanceof Error && error.name === 'AbortError'
2534
}
2635

36+
function wantsStreamedExecuteResponse(req: NextRequest): boolean {
37+
return (
38+
req.headers.get(MOTHERSHIP_EXECUTE_STREAM_HEADER) === MOTHERSHIP_EXECUTE_STREAM_VALUE ||
39+
req.headers.get('accept')?.includes(MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE) === true
40+
)
41+
}
42+
43+
function encodeNdjson(value: unknown): Uint8Array {
44+
return new TextEncoder().encode(`${JSON.stringify(value)}\n`)
45+
}
46+
47+
function buildExecuteResponsePayload(
48+
result: Awaited<ReturnType<typeof runHeadlessCopilotLifecycle>>,
49+
effectiveChatId: string,
50+
integrationTools: Array<{ name: string }>
51+
) {
52+
const clientToolNames = new Set(integrationTools.map((t) => t.name))
53+
const clientToolCalls = (result.toolCalls || []).filter(
54+
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
55+
)
56+
57+
return {
58+
content: result.content,
59+
model: 'mothership',
60+
conversationId: effectiveChatId,
61+
tokens: result.usage
62+
? {
63+
prompt: result.usage.prompt,
64+
completion: result.usage.completion,
65+
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
66+
}
67+
: {},
68+
cost: result.cost || undefined,
69+
toolCalls: clientToolCalls,
70+
}
71+
}
72+
2773
/**
2874
* POST /api/mothership/execute
2975
*
30-
* Non-streaming endpoint for Mothership block execution within workflows.
31-
* Called by the executor via internal JWT auth, not by the browser directly.
32-
* Consumes the Go SSE stream internally and returns a single JSON response.
76+
* Endpoint for Mothership block execution within workflows. Called by the
77+
* executor via internal JWT auth, not by the browser directly. JSON callers get
78+
* a single final response; NDJSON callers get heartbeats followed by a final
79+
* event so long-running headless requests do not look idle to HTTP stacks.
3380
*/
3481
export const POST = withRouteHandler(async (req: NextRequest) => {
3582
let messageId: string | undefined
@@ -100,7 +147,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
100147

101148
let allowExplicitAbort = true
102149
let explicitAbortRequest: Promise<void> | undefined
103-
const onAbort = () => {
150+
const lifecycleAbortController = new AbortController()
151+
const requestExplicitAbortOnce = () => {
104152
if (!allowExplicitAbort || explicitAbortRequest || !messageId) {
105153
return
106154
}
@@ -115,15 +163,24 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
115163
})
116164
})
117165
}
166+
const abortLifecycle = (reason?: unknown) => {
167+
if (!lifecycleAbortController.signal.aborted) {
168+
lifecycleAbortController.abort(reason ?? 'mothership_execute_aborted')
169+
}
170+
requestExplicitAbortOnce()
171+
}
172+
const onAbort = () => {
173+
abortLifecycle(req.signal.reason ?? 'request_aborted')
174+
}
118175

119176
if (req.signal.aborted) {
120177
onAbort()
121178
} else {
122179
req.signal.addEventListener('abort', onAbort, { once: true })
123180
}
124181

125-
try {
126-
const result = await runHeadlessCopilotLifecycle(requestPayload, {
182+
const runLifecycle = (onEvent?: (event: StreamEvent) => Promise<void>) =>
183+
runHeadlessCopilotLifecycle(requestPayload, {
127184
userId,
128185
workspaceId,
129186
chatId: effectiveChatId,
@@ -133,12 +190,137 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
133190
goRoute: '/api/mothership/execute',
134191
autoExecuteTools: true,
135192
interactive: false,
136-
abortSignal: req.signal,
193+
abortSignal: lifecycleAbortController.signal,
194+
onEvent,
137195
})
138196

197+
if (wantsStreamedExecuteResponse(req)) {
198+
let cancelled = false
199+
let heartbeatId: ReturnType<typeof setInterval> | undefined
200+
201+
const stream = new ReadableStream<Uint8Array>({
202+
start(controller) {
203+
const send = (event: unknown) => {
204+
if (!cancelled) {
205+
controller.enqueue(encodeNdjson(event))
206+
}
207+
}
208+
209+
// Flush response headers promptly and keep long headless runs from
210+
// looking idle to worker/proxy HTTP stacks.
211+
send({ type: 'heartbeat', timestamp: new Date().toISOString() })
212+
heartbeatId = setInterval(() => {
213+
send({ type: 'heartbeat', timestamp: new Date().toISOString() })
214+
}, MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS)
215+
216+
void (async () => {
217+
try {
218+
const result = await runLifecycle(async (event) => {
219+
if (
220+
event.type === MothershipStreamV1EventType.text &&
221+
event.payload.channel === MothershipStreamV1TextChannel.assistant &&
222+
event.payload.text
223+
) {
224+
send({ type: 'chunk', content: event.payload.text })
225+
}
226+
})
227+
allowExplicitAbort = false
228+
229+
if (lifecycleAbortController.signal.aborted) {
230+
send({ type: 'error', error: 'Mothership execution aborted' })
231+
return
232+
}
233+
234+
if (!result.success) {
235+
logger.error(
236+
messageId
237+
? `Mothership execute failed [messageId:${messageId}]`
238+
: 'Mothership execute failed',
239+
{
240+
requestId,
241+
workflowId,
242+
executionId,
243+
error: result.error,
244+
errors: result.errors,
245+
}
246+
)
247+
send({
248+
type: 'error',
249+
error: result.error || 'Mothership execution failed',
250+
content: result.content || '',
251+
})
252+
return
253+
}
254+
255+
send({
256+
type: 'final',
257+
data: buildExecuteResponsePayload(result, effectiveChatId, integrationTools),
258+
})
259+
} catch (error) {
260+
if (
261+
lifecycleAbortController.signal.aborted ||
262+
req.signal.aborted ||
263+
isAbortError(error)
264+
) {
265+
logger.info(
266+
messageId
267+
? `Mothership execute aborted [messageId:${messageId}]`
268+
: 'Mothership execute aborted',
269+
{ requestId }
270+
)
271+
send({ type: 'error', error: 'Mothership execution aborted' })
272+
return
273+
}
274+
275+
logger.error(
276+
messageId
277+
? `Mothership execute error [messageId:${messageId}]`
278+
: 'Mothership execute error',
279+
{
280+
requestId,
281+
error: error instanceof Error ? error.message : 'Unknown error',
282+
}
283+
)
284+
send({
285+
type: 'error',
286+
error: error instanceof Error ? error.message : 'Internal server error',
287+
})
288+
} finally {
289+
allowExplicitAbort = false
290+
if (heartbeatId) {
291+
clearInterval(heartbeatId)
292+
}
293+
req.signal.removeEventListener('abort', onAbort)
294+
await explicitAbortRequest
295+
if (!cancelled) {
296+
controller.close()
297+
}
298+
}
299+
})()
300+
},
301+
cancel(reason) {
302+
cancelled = true
303+
if (heartbeatId) {
304+
clearInterval(heartbeatId)
305+
}
306+
abortLifecycle(reason ?? 'mothership_execute_stream_cancelled')
307+
},
308+
})
309+
310+
return new Response(stream, {
311+
headers: {
312+
'Content-Type': `${MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE}; charset=utf-8`,
313+
'Cache-Control': 'no-cache, no-transform',
314+
},
315+
})
316+
}
317+
318+
try {
319+
const result = await runLifecycle()
320+
139321
allowExplicitAbort = false
140322

141-
if (req.signal.aborted) {
323+
if (lifecycleAbortController.signal.aborted || req.signal.aborted) {
142324
reqLogger.info('Mothership execute aborted after lifecycle completion')
143325
return NextResponse.json({ error: 'Mothership execution aborted' }, { status: 499 })
144326
}
@@ -165,25 +347,9 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
165347
)
166348
}
167349

168-
const clientToolNames = new Set(integrationTools.map((t) => t.name))
169-
const clientToolCalls = (result.toolCalls || []).filter(
170-
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
350+
return NextResponse.json(
351+
buildExecuteResponsePayload(result, effectiveChatId, integrationTools)
171352
)
172-
173-
return NextResponse.json({
174-
content: result.content,
175-
model: 'mothership',
176-
conversationId: effectiveChatId,
177-
tokens: result.usage
178-
? {
179-
prompt: result.usage.prompt,
180-
completion: result.usage.completion,
181-
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
182-
}
183-
: {},
184-
cost: result.cost || undefined,
185-
toolCalls: clientToolCalls,
186-
})
187353
} finally {
188354
allowExplicitAbort = false
189355
req.signal.removeEventListener('abort', onAbort)

0 commit comments

Comments
 (0)