diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index c13c5320a..77c1d1ee8 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -7695,18 +7695,35 @@ async function getOrCreateAgent(sessionId: string, actorId: string): Promise { - if (!cachedAgent) { - const model = await loadModel(); - cachedAgent = new Agent({ - model, - systemPrompt: SYSTEM_PROMPT, - tools, - }); +const AGENT_CACHE_LIMIT = 128; + +// Reuses one Agent per sessionId so each session keeps its own in-process +// conversation history (best-effort; resets on cold start). A Map preserves +// insertion order, so it doubles as an LRU bounded to 128 sessions — a local +// dev process serving many sessions cannot leak history between them or grow +// without bound. On AgentCore Runtime each microVM serves a single session, so +// this holds one entry. For durable history, attach memory. +const agentCache = new Map(); + +async function getOrCreateAgent(sessionId: string): Promise { + const existing = agentCache.get(sessionId); + if (existing) { + agentCache.delete(sessionId); + agentCache.set(sessionId, existing); + return existing; + } + if (agentCache.size >= AGENT_CACHE_LIMIT) { + const oldest = agentCache.keys().next().value; + if (oldest !== undefined) agentCache.delete(oldest); } - return cachedAgent; + const model = await loadModel(); + const agent = new Agent({ + model, + systemPrompt: SYSTEM_PROMPT, + tools, + }); + agentCache.set(sessionId, agent); + return agent; } {{/if}} @@ -7718,7 +7735,8 @@ const app = new BedrockAgentCoreApp({ const actorId = getActorId(payload, context); const agent = await getOrCreateAgent(sessionId, actorId); {{else}} - const agent = await getOrCreateAgent(); + const sessionId = context?.sessionId ?? 'default-session'; + const agent = await getOrCreateAgent(sessionId); {{/if}} {{#if hasMemory}} @@ -7739,14 +7757,26 @@ const app = new BedrockAgentCoreApp({ await agent.memoryManager?.flush(); } {{else}} - for await (const event of agent.stream(payload.prompt ?? '')) { - if ( - event.type === 'modelStreamUpdateEvent' && - event.event?.type === 'modelContentBlockDeltaEvent' && - event.event.delta?.type === 'textDelta' - ) { - yield { data: event.event.delta.text }; + // Snapshot history before streaming so a failed turn can be rolled back. + // Agent.stream() appends the user message before invoking the model; on a + // mid-stream error that user turn would otherwise linger in the cached + // agent, and the next turn for this session would send consecutive user + // messages (rejected by providers that require strict role alternation, + // e.g. Anthropic). Restoring on error keeps the session reusable. + const snapshot = agent.takeSnapshot({ include: ['messages'] }); + try { + for await (const event of agent.stream(payload.prompt ?? '')) { + if ( + event.type === 'modelStreamUpdateEvent' && + event.event?.type === 'modelContentBlockDeltaEvent' && + event.event.delta?.type === 'textDelta' + ) { + yield { data: event.event.delta.text }; + } } + } catch (error) { + agent.loadSnapshot(snapshot); + throw error; } {{/if}} }, @@ -8068,24 +8098,64 @@ Thumbs.db exports[`Assets Directory Snapshots > TypeScript assets > typescript/typescript/http/vercelai/base/main.ts should match snapshot 1`] = ` "import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime'; -import { streamText } from 'ai'; +import { streamText, type ModelMessage } from 'ai'; import { loadModel } from './model/load.js'; const SYSTEM_PROMPT = \`You are a helpful assistant.\`; +const HISTORY_LIMIT = 128; + +// Keeps one message history per sessionId so each session remembers its own +// turns (best-effort; resets on cold start). A Map preserves insertion order, +// so it doubles as an LRU bounded to 128 sessions — a local dev process serving +// many sessions cannot leak history between them or grow without bound. On +// AgentCore Runtime each microVM serves a single session, so this holds one +// entry. For durable history, persist messages to an external store. +const histories = new Map(); + +function getHistory(sessionId: string): ModelMessage[] { + const existing = histories.get(sessionId); + if (existing) { + histories.delete(sessionId); + histories.set(sessionId, existing); + return existing; + } + if (histories.size >= HISTORY_LIMIT) { + const oldest = histories.keys().next().value; + if (oldest !== undefined) histories.delete(oldest); + } + const fresh: ModelMessage[] = []; + histories.set(sessionId, fresh); + return fresh; +} + const app = new BedrockAgentCoreApp({ invocationHandler: { async *process(payload: any, context: any) { + const sessionId = context?.sessionId ?? 'default-session'; + const history = getHistory(sessionId); + const userMessage: ModelMessage = { role: 'user', content: payload.prompt ?? '' }; + const model = await loadModel(); const result = streamText({ model, system: SYSTEM_PROMPT, - prompt: payload.prompt ?? '', + messages: [...history, userMessage], }); + let assistant = ''; for await (const chunk of result.textStream) { + assistant += chunk; yield { data: chunk }; } + + // Commit the exchange to history only after a non-empty reply. On a failed + // or empty stream the turn is dropped instead of leaving a dangling user + // (or empty assistant) message — consecutive same-role or empty-content + // messages would otherwise be rejected on the next turn for this session. + if (assistant.length > 0) { + history.push(userMessage, { role: 'assistant', content: assistant }); + } }, }, }); diff --git a/src/assets/typescript/http/strands/base/main.ts b/src/assets/typescript/http/strands/base/main.ts index 4b33f583c..fe7c7136b 100644 --- a/src/assets/typescript/http/strands/base/main.ts +++ b/src/assets/typescript/http/strands/base/main.ts @@ -53,18 +53,35 @@ async function getOrCreateAgent(sessionId: string, actorId: string): Promise { - if (!cachedAgent) { - const model = await loadModel(); - cachedAgent = new Agent({ - model, - systemPrompt: SYSTEM_PROMPT, - tools, - }); +// Reuses one Agent per sessionId so each session keeps its own in-process +// conversation history (best-effort; resets on cold start). A Map preserves +// insertion order, so it doubles as an LRU bounded to 128 sessions — a local +// dev process serving many sessions cannot leak history between them or grow +// without bound. On AgentCore Runtime each microVM serves a single session, so +// this holds one entry. For durable history, attach memory. +const agentCache = new Map(); + +async function getOrCreateAgent(sessionId: string): Promise { + const existing = agentCache.get(sessionId); + if (existing) { + agentCache.delete(sessionId); + agentCache.set(sessionId, existing); + return existing; + } + if (agentCache.size >= AGENT_CACHE_LIMIT) { + const oldest = agentCache.keys().next().value; + if (oldest !== undefined) agentCache.delete(oldest); } - return cachedAgent; + const model = await loadModel(); + const agent = new Agent({ + model, + systemPrompt: SYSTEM_PROMPT, + tools, + }); + agentCache.set(sessionId, agent); + return agent; } {{/if}} @@ -76,7 +93,8 @@ const app = new BedrockAgentCoreApp({ const actorId = getActorId(payload, context); const agent = await getOrCreateAgent(sessionId, actorId); {{else}} - const agent = await getOrCreateAgent(); + const sessionId = context?.sessionId ?? 'default-session'; + const agent = await getOrCreateAgent(sessionId); {{/if}} {{#if hasMemory}} @@ -97,14 +115,26 @@ const app = new BedrockAgentCoreApp({ await agent.memoryManager?.flush(); } {{else}} - for await (const event of agent.stream(payload.prompt ?? '')) { - if ( - event.type === 'modelStreamUpdateEvent' && - event.event?.type === 'modelContentBlockDeltaEvent' && - event.event.delta?.type === 'textDelta' - ) { - yield { data: event.event.delta.text }; + // Snapshot history before streaming so a failed turn can be rolled back. + // Agent.stream() appends the user message before invoking the model; on a + // mid-stream error that user turn would otherwise linger in the cached + // agent, and the next turn for this session would send consecutive user + // messages (rejected by providers that require strict role alternation, + // e.g. Anthropic). Restoring on error keeps the session reusable. + const snapshot = agent.takeSnapshot({ include: ['messages'] }); + try { + for await (const event of agent.stream(payload.prompt ?? '')) { + if ( + event.type === 'modelStreamUpdateEvent' && + event.event?.type === 'modelContentBlockDeltaEvent' && + event.event.delta?.type === 'textDelta' + ) { + yield { data: event.event.delta.text }; + } } + } catch (error) { + agent.loadSnapshot(snapshot); + throw error; } {{/if}} }, diff --git a/src/assets/typescript/http/vercelai/base/main.ts b/src/assets/typescript/http/vercelai/base/main.ts index 09fdb933f..b899c9f31 100644 --- a/src/assets/typescript/http/vercelai/base/main.ts +++ b/src/assets/typescript/http/vercelai/base/main.ts @@ -1,22 +1,62 @@ import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime'; -import { streamText } from 'ai'; +import { streamText, type ModelMessage } from 'ai'; import { loadModel } from './model/load.js'; const SYSTEM_PROMPT = `You are a helpful assistant.`; +const HISTORY_LIMIT = 128; + +// Keeps one message history per sessionId so each session remembers its own +// turns (best-effort; resets on cold start). A Map preserves insertion order, +// so it doubles as an LRU bounded to 128 sessions — a local dev process serving +// many sessions cannot leak history between them or grow without bound. On +// AgentCore Runtime each microVM serves a single session, so this holds one +// entry. For durable history, persist messages to an external store. +const histories = new Map(); + +function getHistory(sessionId: string): ModelMessage[] { + const existing = histories.get(sessionId); + if (existing) { + histories.delete(sessionId); + histories.set(sessionId, existing); + return existing; + } + if (histories.size >= HISTORY_LIMIT) { + const oldest = histories.keys().next().value; + if (oldest !== undefined) histories.delete(oldest); + } + const fresh: ModelMessage[] = []; + histories.set(sessionId, fresh); + return fresh; +} + const app = new BedrockAgentCoreApp({ invocationHandler: { async *process(payload: any, context: any) { + const sessionId = context?.sessionId ?? 'default-session'; + const history = getHistory(sessionId); + const userMessage: ModelMessage = { role: 'user', content: payload.prompt ?? '' }; + const model = await loadModel(); const result = streamText({ model, system: SYSTEM_PROMPT, - prompt: payload.prompt ?? '', + messages: [...history, userMessage], }); + let assistant = ''; for await (const chunk of result.textStream) { + assistant += chunk; yield { data: chunk }; } + + // Commit the exchange to history only after a non-empty reply. On a failed + // or empty stream the turn is dropped instead of leaving a dangling user + // (or empty assistant) message — consecutive same-role or empty-content + // messages would otherwise be rejected on the next turn for this session. + if (assistant.length > 0) { + history.push(userMessage, { role: 'assistant', content: assistant }); + } }, }, });