diff --git a/core/src/runner/runner.ts b/core/src/runner/runner.ts index cf3b747e..8387c601 100644 --- a/core/src/runner/runner.ts +++ b/core/src/runner/runner.ts @@ -138,7 +138,7 @@ export class Runner { * * @param params.userId The user ID of the session. * @param params.sessionId The session ID of the session. - * @param params.newMessage A new message to append to the session. + * @param params.newMessage An optional new message to append to the session. * @param params.stateDelta An optional state delta to apply to the session. * @param params.runConfig The run config for the agent. * @yields The events generated by the agent. @@ -147,7 +147,7 @@ export class Runner { async *runAsync(params: { userId: string; sessionId: string; - newMessage: Content; + newMessage?: Content; stateDelta?: Record; runConfig?: RunConfig; }): AsyncGenerator { @@ -208,22 +208,22 @@ export class Runner { pluginManager: this.pluginManager, }); - // ========================================================================= - // Preprocess plugins on user message - // ========================================================================= - const pluginUserMessage = - await this.pluginManager.runOnUserMessageCallback({ - userMessage: newMessage, - invocationContext, - }); - if (pluginUserMessage) { - newMessage = pluginUserMessage as Content; - } - // ========================================================================= // Append user message to session // ========================================================================= if (newMessage) { + // ========================================================================= + // Preprocess plugins on user message + // ========================================================================= + const pluginUserMessage = + await this.pluginManager.runOnUserMessageCallback({ + userMessage: newMessage, + invocationContext, + }); + if (pluginUserMessage) { + newMessage = pluginUserMessage as Content; + } + if (!newMessage.parts?.length) { throw new Error('No parts in the newMessage.'); } @@ -264,52 +264,48 @@ export class Runner { // ========================================================================= // Run the agent with the plugins (aka hooks to apply in the lifecycle) // ========================================================================= - if (newMessage) { - // ========================================================================= - // Run the agent with the plugins (aka hooks to apply in the lifecycle) - // ========================================================================= - // Step 1: Run the before_run callbacks to see if we should early exit. - const beforeRunCallbackResponse = - await this.pluginManager.runBeforeRunCallback({ - invocationContext, - }); + // Step 1: Run the before_run callbacks to see if we should early exit. + const beforeRunCallbackResponse = + await this.pluginManager.runBeforeRunCallback({ + invocationContext, + }); - if (beforeRunCallbackResponse) { - const earlyExitEvent = createEvent({ - invocationId: invocationContext.invocationId, - author: 'model', - content: beforeRunCallbackResponse, - }); - // TODO: b/447446338 - In the future, do *not* save live call audio - // content to session This is a feature in Python ADK - await this.sessionService.appendEvent({ - session, - event: earlyExitEvent, - }); - yield earlyExitEvent; - } else { - // Step 2: Otherwise continue with normal execution - for await (const event of invocationContext.agent.runAsync( - invocationContext, - )) { - if (!event.partial) { - await this.sessionService.appendEvent({session, event}); - } - // Step 3: Run the on_event callbacks to optionally modify the event. - const modifiedEvent = - await this.pluginManager.runOnEventCallback({ - invocationContext, - event, - }); - if (modifiedEvent) { - yield modifiedEvent; - } else { - yield event; - } + if (beforeRunCallbackResponse) { + const earlyExitEvent = createEvent({ + invocationId: invocationContext.invocationId, + author: 'model', + content: beforeRunCallbackResponse, + }); + // TODO: b/447446338 - In the future, do *not* save live call audio + // content to session This is a feature in Python ADK + await this.sessionService.appendEvent({ + session, + event: earlyExitEvent, + }); + yield earlyExitEvent; + } else { + // Step 2: Otherwise continue with normal execution + for await (const event of invocationContext.agent.runAsync( + invocationContext, + )) { + if (!event.partial) { + await this.sessionService.appendEvent({session, event}); + } + // Step 3: Run the on_event callbacks to optionally modify the event. + const modifiedEvent = await this.pluginManager.runOnEventCallback( + { + invocationContext, + event, + }, + ); + if (modifiedEvent) { + yield modifiedEvent; + } else { + yield event; } - // Step 4: Run the after_run callbacks to optionally modify the context. - await this.pluginManager.runAfterRunCallback({invocationContext}); } + // Step 4: Run the after_run callbacks to optionally modify the context. + await this.pluginManager.runAfterRunCallback({invocationContext}); } }, ); @@ -435,9 +431,161 @@ export class Runner { } return true; } + + /** + * Runs the agent continuously with a stream of input messages. + * This provides a true concurrent multiplexing model, handling live user + * messages safely during active execution block intervals. + * + * @param params.userId The user ID of the session. + * @param params.sessionId The session ID of the session. + * @param params.inputStream The stream of incoming user content. + * @param params.stateDelta An optional state delta. + * @param params.runConfig The run config for the agent. + * @yields The events generated by the agent and echoed from the user. + */ + async *runStream(params: { + userId: string; + sessionId: string; + inputStream: AsyncIterable; + stateDelta?: Record; + runConfig?: RunConfig; + }): AsyncGenerator { + const outputQueue = new AsyncEventQueue(); + let activeAgentGenerators = 0; + let pendingRun = false; + + // Helper to start the agent execution generator + const runAgent = async () => { + activeAgentGenerators++; + try { + do { + pendingRun = false; + const generator = this.runAsync({ + userId: params.userId, + sessionId: params.sessionId, + stateDelta: params.stateDelta, + runConfig: params.runConfig, + }); + for await (const event of generator) { + outputQueue.push(event); + } + } while (pendingRun); + } catch (e) { + outputQueue.close(e as Error); + } finally { + activeAgentGenerators--; + } + }; + + // Helper to consume incoming user stream + const consumeInput = async () => { + try { + const session = await this.sessionService.getSession({ + appName: this.appName, + userId: params.userId, + sessionId: params.sessionId, + }); + + if (!session) { + throw new Error(`Session not found: ${params.sessionId}`); + } + + for await (const newMessage of params.inputStream) { + if (!newMessage || !newMessage.parts?.length) continue; + + // 1. Immediately inject the event into the session events. + // This allows mid-generation concurrency reactivity on the next step. + const userEvent = createEvent({ + invocationId: newInvocationContextId(), + author: 'user', + content: newMessage, + }); + + if (params.runConfig?.saveInputBlobsAsArtifacts) { + await this.saveArtifacts( + userEvent.invocationId, + params.userId, + params.sessionId, + newMessage, + ); + } + + await this.sessionService.appendEvent({ + session, + event: userEvent, + }); + + outputQueue.push(userEvent); // Echo it out as an event + + // 2. Trigger the agent loop if no active generators are handling the session. + if (activeAgentGenerators === 0) { + runAgent(); + } else { + // Let the active generator know it needs to run again if it exits soon. + pendingRun = true; + } + } + } catch (e) { + outputQueue.close(e as Error); + } finally { + outputQueue.close(); + } + }; + + // Start consuming + consumeInput(); + + // Stream multiplexed output + yield* outputQueue; + } // TODO - b/425992518: Implement runLive and related methods. } +class AsyncEventQueue { + private queue: T[] = []; + private resolves: ((val: IteratorResult) => void)[] = []; + private isClosed = false; + private error?: Error; + + push(value: T) { + if (this.resolves.length > 0) { + this.resolves.shift()!({value, done: false}); + } else { + this.queue.push(value); + } + } + + close(error?: Error) { + this.isClosed = true; + this.error = error; + while (this.resolves.length > 0) { + if (error) { + this.resolves.shift()!(Promise.reject(error)); + } else { + this.resolves.shift()!({value: undefined, done: true}); + } + } + } + + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + if (this.queue.length > 0) { + yield this.queue.shift()!; + } else if (this.isClosed) { + if (this.error) throw this.error; + return; + } else { + const result = await new Promise>((resolve) => { + this.resolves.push(resolve); + }); + if (result.done) return; + yield result.value; + } + } + } +} + /** * It iterates through the events in reverse order, and returns the event * containing a function call with a functionCall.id matching the diff --git a/core/test/runner/streaming_runner_test.ts b/core/test/runner/streaming_runner_test.ts index a1bb12fd..2135f487 100644 --- a/core/test/runner/streaming_runner_test.ts +++ b/core/test/runner/streaming_runner_test.ts @@ -137,6 +137,82 @@ describe('Runner Streaming and Ephemeral', () => { }); }); + describe('runStream', () => { + it('should multiplex input stream and agent events', async () => { + let resolveInput: (val: any) => void; + let inputPromise = new Promise((resolve) => { + resolveInput = resolve; + }); + + const inputStream = { + loopActive: true, + push: (val: string) => { + resolveInput(val); + }, + close: () => { + resolveInput(null); + }, + async *[Symbol.asyncIterator]() { + while (this.loopActive) { + const val = await inputPromise; + if (val === null) break; // End of stream + yield {role: 'user', parts: [{text: val}]}; + inputPromise = new Promise((resolve) => { + resolveInput = resolve; + }); + } + }, + }; + + const session = await sessionService.createSession({ + appName: TEST_APP_ID, + userId: TEST_USER_ID, + }); + + const generator = runner.runStream({ + userId: TEST_USER_ID, + sessionId: session.id, + inputStream: inputStream as any, + }); + + const events: Event[] = []; + const consumeEvents = async () => { + for await (const event of generator) { + events.push(event); + } + }; + + // Start consuming events + const consumePromise = consumeEvents(); + + // Push first message + inputStream.push('Hello 1'); + + // Give it a moment to process first message + await new Promise((r) => setTimeout(r, 50)); + + // Push second message + inputStream.push('Hello 2'); + + // Close stream + await new Promise((r) => setTimeout(r, 50)); + inputStream.close(); + + await consumePromise; + + // We should see user events and model events mixed in the output + expect(events.length).toBeGreaterThan(0); + const userEvents = events.filter( + (e) => + e.author === 'user' && e.content?.parts?.[0]?.text?.includes('Hello'), + ); + expect(userEvents.length).toBe(2); + + const modelEvents = events.filter((e) => e.author === rootAgent.name); + expect(modelEvents.length).toBeGreaterThan(0); + }); + }); + describe('toStructuredEvents', () => { it('should convert error events', () => { const event = createEvent({ diff --git a/samples/streaming_input_agent.ts b/samples/streaming_input_agent.ts new file mode 100644 index 00000000..d6607ae5 --- /dev/null +++ b/samples/streaming_input_agent.ts @@ -0,0 +1,174 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + FunctionTool, + InMemoryRunner, + LlmAgent, + LogLevel, + setLogLevel, +} from '@google/adk'; +import type {Content} from '@google/genai'; +import {createUserContent} from '@google/genai'; +import dotenv from 'dotenv'; +import path from 'path'; +import {fileURLToPath} from 'url'; +import {z} from 'zod'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +dotenv.config({path: path.join(__dirname, '.env'), quiet: true}); + +class ContentInputStream { + private queue: T[] = []; + private resolves: ((val: IteratorResult) => void)[] = []; + private isClosed = false; + + push(content: T) { + if (this.resolves.length > 0) { + this.resolves.shift()!({value: content, done: false}); + } else { + this.queue.push(content); + } + } + + end() { + this.isClosed = true; + while (this.resolves.length > 0) { + this.resolves.shift()!({value: undefined, done: true}); + } + } + + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + if (this.queue.length > 0) { + yield this.queue.shift()!; + } else if (this.isClosed) { + return; + } else { + const result = await new Promise>((resolve) => { + this.resolves.push(resolve); + }); + if (result.done) return; + yield result.value; + } + } + } +} + +interface ToolResult { + status: 'success' | 'error'; + report?: string; + error_message?: string; +} + +const getWeatherTool = new FunctionTool({ + name: 'get_weather', + description: 'Retrieves the current weather report for a specified city.', + parameters: z.object({ + city: z.string().describe('The name of the city.'), + }), + execute: async ({city}: {city: string}): Promise => { + if (city.toLowerCase() === 'new york') { + return { + status: 'success', + report: + 'The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit).', + }; + } + + return { + status: 'error', + error_message: `Weather information for '${city}' is not available.`, + }; + }, +}); + +const llmAgent = new LlmAgent({ + name: 'streaming_input_agent', + description: 'Agent that can answer questions about the weather', + model: 'gemini-2.5-flash', + tools: [getWeatherTool], +}); + +const runner = new InMemoryRunner({ + appName: 'streaming_input_agent', + agent: llmAgent, +}); + +async function main() { + setLogLevel(LogLevel.ERROR); + const session = await runner.sessionService.createSession({ + appName: 'streaming_input_agent', + userId: 'test_user_id', + }); + const inputStream = new ContentInputStream(); + + // Push first message + inputStream.push(createUserContent('Hello')); + + // Start streaming + const responseStream = runner.runStream({ + userId: 'test_user_id', + sessionId: session.id, + inputStream, + }); + + // Push messages after 1 second + setTimeout(() => { + inputStream.push( + createUserContent('I need to know the weather in New York'), + ); + inputStream.push( + createUserContent('And what is the current weather in London?'), + ); + }, 1000); + + // Push message after 2 seconds + setTimeout(() => { + inputStream.push( + createUserContent('And what is the weather in Sunnyvale now?'), + ); + }, 2000); + + for await (const event of responseStream) { + if (event.author === 'user') { + console.log(`-> inputMessage: ${event.content?.parts?.[0]?.text}`); + } else { + if (event.content?.parts?.[0]?.functionCall) { + console.log( + `-> modelResponse (tool call): ${event.content?.parts?.[0]?.functionCall.name} ${JSON.stringify(event.content?.parts?.[0]?.functionCall.args)}`, + ); + } else if (event.content?.parts?.[0]?.functionResponse) { + console.log( + `-> modelResponse (tool response): ${event.content?.parts?.[0]?.functionResponse.name} ${JSON.stringify(event.content?.parts?.[0]?.functionResponse.response)}`, + ); + } else if (event.content?.parts?.[0]?.text) { + console.log(`-> modelResponse: ${event.content?.parts?.[0]?.text}`); + } + } + } +} + +/** + * Expected output: + * + * -> inputMessage: Hello + * -> modelResponse: Hello! I can provide you with weather information. Which city are you interested in? + * -> inputMessage: I need to know the weather in New York + * -> inputMessage: And what is the current weather in London? + * -> modelResponse (tool call): get_weather {"city":"New York"} + * -> modelResponse (tool response): get_weather {"status":"success","report":"The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit)."} + * -> inputMessage: And what is the weather in Sunnyvale now? + * -> modelResponse: The weather in New York is sunny with a temperature of 25 degrees Celsius (77 degrees Fahrenheit). + * -> modelResponse (tool call): get_weather {"city":"London"} + * -> modelResponse (tool response): get_weather {"status":"error","error_message":"Weather information for 'London' is not available."} + * -> modelResponse (tool call): get_weather {"city":"Sunnyvale"} + * -> modelResponse (tool response): get_weather {"status":"error","error_message":"Weather information for 'Sunnyvale' is not available."} + * -> modelResponse: I'm sorry, but I don't have weather information for London or Sunnyvale at the moment. + */ +main().catch((error) => { + console.error(error); +});