Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 91 additions & 21 deletions src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -7695,18 +7695,35 @@ async function getOrCreateAgent(sessionId: string, actorId: string): Promise<Age
return agent;
}
{{else}}
let cachedAgent: Agent | null = null;

async function getOrCreateAgent(): Promise<Agent> {
if (!cachedAgent) {
const model = await loadModel();
cachedAgent = new Agent({
model,
systemPrompt: SYSTEM_PROMPT,
tools,
});
const AGENT_CACHE_LIMIT = 128;

// Reuses one Agent per sessionId so each session keeps its own in-process
// conversation history (best-effort; resets on cold start). A Map preserves
// insertion order, so it doubles as an LRU bounded to 128 sessions — a local
// dev process serving many sessions cannot leak history between them or grow
// without bound. On AgentCore Runtime each microVM serves a single session, so
// this holds one entry. For durable history, attach memory.
const agentCache = new Map<string, Agent>();

async function getOrCreateAgent(sessionId: string): Promise<Agent> {
const existing = agentCache.get(sessionId);
if (existing) {
agentCache.delete(sessionId);
agentCache.set(sessionId, existing);
return existing;
}
if (agentCache.size >= AGENT_CACHE_LIMIT) {
const oldest = agentCache.keys().next().value;
if (oldest !== undefined) agentCache.delete(oldest);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: do we need to do any clean up here aside from just purging oldest agent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — no, purging the oldest agent is sufficient; there's nothing else to tear down.

I checked what a Strands TS Agent holds and what the template gives it:

  • The Agent has no close/dispose/[Symbol.asyncDispose] — it's a plain object holding in-memory state (messages, modelState, the tool/conversation registries). Dropping the last reference makes it GC-eligible; there are no per-agent sockets, file handles, or timers to release.
  • The only things that could hold a connection — the MCP clients and the model — are module-level singletons in the template (const mcpClients = [...], const tools = [...] at top scope), shared by reference across every cached agent. They're intentionally not per-agent, so eviction must not close them (other live sessions are still using them); they persist for the process lifetime by design.

So eviction is just delete of the map entry, matching the Python agent_factory (cache.popitem() with no teardown). If a future change gave each agent its own MCP client, that's when we'd need a disposal hook — but that's not the case here.

}
return cachedAgent;
const model = await loadModel();
const agent = new Agent({
model,
systemPrompt: SYSTEM_PROMPT,
tools,
});
agentCache.set(sessionId, agent);
return agent;
}
{{/if}}

Expand All @@ -7718,7 +7735,8 @@ const app = new BedrockAgentCoreApp({
const actorId = getActorId(payload, context);
const agent = await getOrCreateAgent(sessionId, actorId);
{{else}}
const agent = await getOrCreateAgent();
const sessionId = context?.sessionId ?? 'default-session';
const agent = await getOrCreateAgent(sessionId);
{{/if}}

{{#if hasMemory}}
Expand All @@ -7739,14 +7757,26 @@ const app = new BedrockAgentCoreApp({
await agent.memoryManager?.flush();
}
{{else}}
for await (const event of agent.stream(payload.prompt ?? '')) {
if (
event.type === 'modelStreamUpdateEvent' &&
event.event?.type === 'modelContentBlockDeltaEvent' &&
event.event.delta?.type === 'textDelta'
) {
yield { data: event.event.delta.text };
// Snapshot history before streaming so a failed turn can be rolled back.
// Agent.stream() appends the user message before invoking the model; on a
// mid-stream error that user turn would otherwise linger in the cached
// agent, and the next turn for this session would send consecutive user
// messages (rejected by providers that require strict role alternation,
// e.g. Anthropic). Restoring on error keeps the session reusable.
const snapshot = agent.takeSnapshot({ include: ['messages'] });
try {
for await (const event of agent.stream(payload.prompt ?? '')) {
if (
event.type === 'modelStreamUpdateEvent' &&
event.event?.type === 'modelContentBlockDeltaEvent' &&
event.event.delta?.type === 'textDelta'
) {
yield { data: event.event.delta.text };
}
}
} catch (error) {
agent.loadSnapshot(snapshot);
throw error;
}
{{/if}}
},
Expand Down Expand Up @@ -8068,24 +8098,64 @@ Thumbs.db

exports[`Assets Directory Snapshots > TypeScript assets > typescript/typescript/http/vercelai/base/main.ts should match snapshot 1`] = `
"import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime';
import { streamText } from 'ai';
import { streamText, type ModelMessage } from 'ai';
import { loadModel } from './model/load.js';

const SYSTEM_PROMPT = \`You are a helpful assistant.\`;

const HISTORY_LIMIT = 128;

// Keeps one message history per sessionId so each session remembers its own
// turns (best-effort; resets on cold start). A Map preserves insertion order,
// so it doubles as an LRU bounded to 128 sessions — a local dev process serving
// many sessions cannot leak history between them or grow without bound. On
// AgentCore Runtime each microVM serves a single session, so this holds one
// entry. For durable history, persist messages to an external store.
const histories = new Map<string, ModelMessage[]>();

function getHistory(sessionId: string): ModelMessage[] {
const existing = histories.get(sessionId);
if (existing) {
histories.delete(sessionId);
histories.set(sessionId, existing);
return existing;
}
if (histories.size >= HISTORY_LIMIT) {
const oldest = histories.keys().next().value;
if (oldest !== undefined) histories.delete(oldest);
}
const fresh: ModelMessage[] = [];
histories.set(sessionId, fresh);
return fresh;
}

const app = new BedrockAgentCoreApp({
invocationHandler: {
async *process(payload: any, context: any) {
const sessionId = context?.sessionId ?? 'default-session';
const history = getHistory(sessionId);
const userMessage: ModelMessage = { role: 'user', content: payload.prompt ?? '' };

const model = await loadModel();
const result = streamText({
model,
system: SYSTEM_PROMPT,
prompt: payload.prompt ?? '',
messages: [...history, userMessage],
});

let assistant = '';
for await (const chunk of result.textStream) {
assistant += chunk;
yield { data: chunk };
}

// Commit the exchange to history only after a non-empty reply. On a failed
// or empty stream the turn is dropped instead of leaving a dangling user
// (or empty assistant) message — consecutive same-role or empty-content
// messages would otherwise be rejected on the next turn for this session.
if (assistant.length > 0) {
history.push(userMessage, { role: 'assistant', content: assistant });
}
},
},
});
Expand Down
66 changes: 48 additions & 18 deletions src/assets/typescript/http/strands/base/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,35 @@ async function getOrCreateAgent(sessionId: string, actorId: string): Promise<Age
return agent;
}
{{else}}
let cachedAgent: Agent | null = null;
const AGENT_CACHE_LIMIT = 128;

async function getOrCreateAgent(): Promise<Agent> {
if (!cachedAgent) {
const model = await loadModel();
cachedAgent = new Agent({
model,
systemPrompt: SYSTEM_PROMPT,
tools,
});
// Reuses one Agent per sessionId so each session keeps its own in-process
// conversation history (best-effort; resets on cold start). A Map preserves
// insertion order, so it doubles as an LRU bounded to 128 sessions — a local
// dev process serving many sessions cannot leak history between them or grow
// without bound. On AgentCore Runtime each microVM serves a single session, so
// this holds one entry. For durable history, attach memory.
const agentCache = new Map<string, Agent>();

async function getOrCreateAgent(sessionId: string): Promise<Agent> {
const existing = agentCache.get(sessionId);
if (existing) {
agentCache.delete(sessionId);
agentCache.set(sessionId, existing);
return existing;
}
if (agentCache.size >= AGENT_CACHE_LIMIT) {
const oldest = agentCache.keys().next().value;
if (oldest !== undefined) agentCache.delete(oldest);
}
return cachedAgent;
const model = await loadModel();
const agent = new Agent({
model,
systemPrompt: SYSTEM_PROMPT,
tools,
});
agentCache.set(sessionId, agent);
return agent;
}
{{/if}}

Expand All @@ -76,7 +93,8 @@ const app = new BedrockAgentCoreApp({
const actorId = getActorId(payload, context);
const agent = await getOrCreateAgent(sessionId, actorId);
{{else}}
const agent = await getOrCreateAgent();
const sessionId = context?.sessionId ?? 'default-session';
const agent = await getOrCreateAgent(sessionId);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mid-stream failure on a cached Strands Agent poisons the session — same class of bug as the one we just fixed in VercelAI, but on this branch it survives because nothing rolls back the agent's internal state.

Looking at @strands-agents/sdk 1.7.0 (dist/src/agent/agent.js), Agent.stream(prompt) appends the user message to this.messages before invoking the model:

// Normalize input and append user messages on first invocation only
if (currentArgs !== undefined) {
  const messagesToAppend = this._normalizeInput(currentArgs);
  for (const message of messagesToAppend) {
    yield this._appendMessage(message, invocationState);   // <-- user msg committed here
  }
  ...
}
...
const modelResult = yield* this._invokeModel(invocationState, structuredOutputChoice);

The outer catch (error) only does special handling for CancelledError / InterruptError; a generic error from _invokeModel (e.g. Bedrock throttling, transient provider error) just re-throws after closing spans — this.messages keeps the dangling user turn. Because we now cache one Agent per sessionId, the next invocation on that session calls _normalizeInput again and appends another user message, so the agent's history ends with two consecutive user messages and Bedrock/Anthropic providers strict-reject the request from then on. Single transient error → session permanently broken for the lifetime of the process.

Before this PR, cachedAgent was a single global, so this was already latent — but limited to "the whole process gets stuck" rather than per-session. Now that we're explicitly selling "per-session in-process short-term memory", the failure mode is more visible and worth closing.

A few ways to fix:

  1. Snapshot/restore via the SDK. Agent exposes takeSnapshot({ include: ['messages'] }) and loadSnapshot(snapshot) (see dist/src/agent/agent.d.ts). Before agent.stream(...), take a messages snapshot; in a catch, loadSnapshot it and rethrow. Cleanest because it uses the supported public API.
  2. Length-based truncation. Capture agent.messages.length before the loop; on error, splice back to that length and rethrow. Lower-level but doesn't depend on snapshot semantics.
  3. Evict on error. Wrap the for await in try/catch; on error agentCache.delete(sessionId) and rethrow. Simpler, but the session loses all prior history on any transient failure — fine as a stopgap if the SDK snapshot API isn't stable yet.

Option 1 matches the SDK's own "keep agent.messages reinvokable at all times" design intent (per the deferred-append comment in agent.js) and is what I'd lean toward. Option 3 is the minimal viable fix if you'd rather not introduce a snapshot dependency in the template.

Worth verifying by reproducing the same way the VercelAI case was: force a provider error on turn 1 (e.g. invalid model id, or temporarily throttled), then send turn 2 on the same --session-id and confirm it isn't rejected with a "consecutive user messages" / role-validation error from Bedrock.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 1540ed6 with Option 1 (snapshot/restore via the SDK's takeSnapshot/loadSnapshot).

I reproduced it as you suggested and want to share the nuance I found, because it changes the severity (not the fix):

The dangling user turn is real — after a forced turn-1 failure on a cached agent, agent.messages is left as ["user"], and a subsequent turn appends another, giving ["user","user","assistant"]. Strands' model adapters don't merge consecutive same-role messages (AnthropicModel._formatMessages is a plain .map), so the doubled user turn is sent as-is.

But whether it actually breaks is provider-dependent:

  • Bedrock (the template's default modelProvider): I tested both via the Agent and a raw ConverseCommand with two consecutive user messages — both accepted, and the model answered correctly. So on the default provider the session is not poisoned.
  • Anthropic / OpenAI direct: those APIs require strict user/assistant alternation, so the doubled user turn would be rejected from then on.

Since the template ships all four providers, I fixed it regardless rather than rely on Bedrock's leniency.

The guard snapshots messages before the stream and restores on error, scoped to the no-memory branch only — the hasMemory branch persists through AgentCoreMemorySessionManager (server-side events via hooks), where rolling back local agent.messages wouldn't undo the server write, so it deliberately keeps its existing try/finally … flush() shape.

const snapshot = agent.takeSnapshot({ include: ['messages'] });
try {
  for await (const event of agent.stream(payload.prompt ?? '')) { /* … */ }
} catch (error) {
  agent.loadSnapshot(snapshot);
  throw error;
}

Verified end-to-end through the BedrockAgentCoreApp handler with a model that throws on turn 1: turn 1 returns an SSE error and rolls back, and turn 2 on the same sessionId runs cleanly with ["user","assistant"] history instead of a poisoned ["user","user",…].

{{/if}}

{{#if hasMemory}}
Expand All @@ -97,14 +115,26 @@ const app = new BedrockAgentCoreApp({
await agent.memoryManager?.flush();
}
{{else}}
for await (const event of agent.stream(payload.prompt ?? '')) {
if (
event.type === 'modelStreamUpdateEvent' &&
event.event?.type === 'modelContentBlockDeltaEvent' &&
event.event.delta?.type === 'textDelta'
) {
yield { data: event.event.delta.text };
// Snapshot history before streaming so a failed turn can be rolled back.
// Agent.stream() appends the user message before invoking the model; on a
// mid-stream error that user turn would otherwise linger in the cached
// agent, and the next turn for this session would send consecutive user
// messages (rejected by providers that require strict role alternation,
// e.g. Anthropic). Restoring on error keeps the session reusable.
const snapshot = agent.takeSnapshot({ include: ['messages'] });
try {
for await (const event of agent.stream(payload.prompt ?? '')) {
if (
event.type === 'modelStreamUpdateEvent' &&
event.event?.type === 'modelContentBlockDeltaEvent' &&
event.event.delta?.type === 'textDelta'
) {
yield { data: event.event.delta.text };
}
}
} catch (error) {
agent.loadSnapshot(snapshot);
throw error;
}
{{/if}}
},
Expand Down
44 changes: 42 additions & 2 deletions src/assets/typescript/http/vercelai/base/main.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,62 @@
import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime';
import { streamText } from 'ai';
import { streamText, type ModelMessage } from 'ai';
import { loadModel } from './model/load.js';

const SYSTEM_PROMPT = `You are a helpful assistant.`;

const HISTORY_LIMIT = 128;

// Keeps one message history per sessionId so each session remembers its own
// turns (best-effort; resets on cold start). A Map preserves insertion order,
// so it doubles as an LRU bounded to 128 sessions — a local dev process serving
// many sessions cannot leak history between them or grow without bound. On
// AgentCore Runtime each microVM serves a single session, so this holds one
// entry. For durable history, persist messages to an external store.
const histories = new Map<string, ModelMessage[]>();

function getHistory(sessionId: string): ModelMessage[] {
const existing = histories.get(sessionId);
if (existing) {
histories.delete(sessionId);
histories.set(sessionId, existing);
return existing;
}
if (histories.size >= HISTORY_LIMIT) {
const oldest = histories.keys().next().value;
if (oldest !== undefined) histories.delete(oldest);
}
const fresh: ModelMessage[] = [];
histories.set(sessionId, fresh);
return fresh;
}

const app = new BedrockAgentCoreApp({
invocationHandler: {
async *process(payload: any, context: any) {
const sessionId = context?.sessionId ?? 'default-session';
const history = getHistory(sessionId);
const userMessage: ModelMessage = { role: 'user', content: payload.prompt ?? '' };

const model = await loadModel();
const result = streamText({
model,
system: SYSTEM_PROMPT,
prompt: payload.prompt ?? '',
messages: [...history, userMessage],
});

let assistant = '';
for await (const chunk of result.textStream) {
assistant += chunk;
yield { data: chunk };
}

// Commit the exchange to history only after a non-empty reply. On a failed
// or empty stream the turn is dropped instead of leaving a dangling user
// (or empty assistant) message — consecutive same-role or empty-content
// messages would otherwise be rejected on the next turn for this session.
if (assistant.length > 0) {
history.push(userMessage, { role: 'assistant', content: assistant });
}
},
},
});
Expand Down
Loading