Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 205 additions & 57 deletions core/src/runner/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class Runner {
*
* @param params.userId The user ID of the session.
* @param params.sessionId The session ID of the session.
* @param params.newMessage A new message to append to the session.
* @param params.newMessage An optional new message to append to the session.
* @param params.stateDelta An optional state delta to apply to the session.
* @param params.runConfig The run config for the agent.
* @yields The events generated by the agent.
Expand All @@ -147,7 +147,7 @@ export class Runner {
async *runAsync(params: {
userId: string;
sessionId: string;
newMessage: Content;
newMessage?: Content;
stateDelta?: Record<string, unknown>;
runConfig?: RunConfig;
}): AsyncGenerator<Event, void, undefined> {
Expand Down Expand Up @@ -208,22 +208,22 @@ export class Runner {
pluginManager: this.pluginManager,
});

// =========================================================================
// Preprocess plugins on user message
// =========================================================================
const pluginUserMessage =
await this.pluginManager.runOnUserMessageCallback({
userMessage: newMessage,
invocationContext,
});
if (pluginUserMessage) {
newMessage = pluginUserMessage as Content;
}

// =========================================================================
// Append user message to session
// =========================================================================
if (newMessage) {
// =========================================================================
// Preprocess plugins on user message
// =========================================================================
const pluginUserMessage =
await this.pluginManager.runOnUserMessageCallback({
userMessage: newMessage,
invocationContext,
});
if (pluginUserMessage) {
newMessage = pluginUserMessage as Content;
}

if (!newMessage.parts?.length) {
throw new Error('No parts in the newMessage.');
}
Expand Down Expand Up @@ -264,52 +264,48 @@ export class Runner {
// =========================================================================
// Run the agent with the plugins (aka hooks to apply in the lifecycle)
// =========================================================================
if (newMessage) {
// =========================================================================
// Run the agent with the plugins (aka hooks to apply in the lifecycle)
// =========================================================================
// Step 1: Run the before_run callbacks to see if we should early exit.
const beforeRunCallbackResponse =
await this.pluginManager.runBeforeRunCallback({
invocationContext,
});
// Step 1: Run the before_run callbacks to see if we should early exit.
const beforeRunCallbackResponse =
await this.pluginManager.runBeforeRunCallback({
invocationContext,
});

if (beforeRunCallbackResponse) {
const earlyExitEvent = createEvent({
invocationId: invocationContext.invocationId,
author: 'model',
content: beforeRunCallbackResponse,
});
// TODO: b/447446338 - In the future, do *not* save live call audio
// content to session This is a feature in Python ADK
await this.sessionService.appendEvent({
session,
event: earlyExitEvent,
});
yield earlyExitEvent;
} else {
// Step 2: Otherwise continue with normal execution
for await (const event of invocationContext.agent.runAsync(
invocationContext,
)) {
if (!event.partial) {
await this.sessionService.appendEvent({session, event});
}
// Step 3: Run the on_event callbacks to optionally modify the event.
const modifiedEvent =
await this.pluginManager.runOnEventCallback({
invocationContext,
event,
});
if (modifiedEvent) {
yield modifiedEvent;
} else {
yield event;
}
if (beforeRunCallbackResponse) {
const earlyExitEvent = createEvent({
invocationId: invocationContext.invocationId,
author: 'model',
content: beforeRunCallbackResponse,
});
// TODO: b/447446338 - In the future, do *not* save live call audio
// content to session This is a feature in Python ADK
await this.sessionService.appendEvent({
session,
event: earlyExitEvent,
});
yield earlyExitEvent;
} else {
// Step 2: Otherwise continue with normal execution
for await (const event of invocationContext.agent.runAsync(
invocationContext,
)) {
if (!event.partial) {
await this.sessionService.appendEvent({session, event});
}
// Step 3: Run the on_event callbacks to optionally modify the event.
const modifiedEvent = await this.pluginManager.runOnEventCallback(
{
invocationContext,
event,
},
);
if (modifiedEvent) {
yield modifiedEvent;
} else {
yield event;
}
// Step 4: Run the after_run callbacks to optionally modify the context.
await this.pluginManager.runAfterRunCallback({invocationContext});
}
// Step 4: Run the after_run callbacks to optionally modify the context.
await this.pluginManager.runAfterRunCallback({invocationContext});
}
},
);
Expand Down Expand Up @@ -435,9 +431,161 @@ export class Runner {
}
return true;
}

/**
* Runs the agent continuously with a stream of input messages.
* This provides a true concurrent multiplexing model, handling live user
* messages safely during active execution block intervals.
*
* @param params.userId The user ID of the session.
* @param params.sessionId The session ID of the session.
* @param params.inputStream The stream of incoming user content.
* @param params.stateDelta An optional state delta.
* @param params.runConfig The run config for the agent.
* @yields The events generated by the agent and echoed from the user.
*/
async *runStream(params: {
userId: string;
sessionId: string;
inputStream: AsyncIterable<Content>;
stateDelta?: Record<string, unknown>;
runConfig?: RunConfig;
}): AsyncGenerator<Event, void, undefined> {
const outputQueue = new AsyncEventQueue<Event>();
let activeAgentGenerators = 0;
let pendingRun = false;

// Helper to start the agent execution generator
const runAgent = async () => {
activeAgentGenerators++;
try {
do {
pendingRun = false;
const generator = this.runAsync({
userId: params.userId,
sessionId: params.sessionId,
stateDelta: params.stateDelta,
runConfig: params.runConfig,
});
for await (const event of generator) {
outputQueue.push(event);
}
} while (pendingRun);
} catch (e) {
outputQueue.close(e as Error);
} finally {
activeAgentGenerators--;
}
};

// Helper to consume incoming user stream
const consumeInput = async () => {
try {
const session = await this.sessionService.getSession({
appName: this.appName,
userId: params.userId,
sessionId: params.sessionId,
});

if (!session) {
throw new Error(`Session not found: ${params.sessionId}`);
}

for await (const newMessage of params.inputStream) {
if (!newMessage || !newMessage.parts?.length) continue;

// 1. Immediately inject the event into the session events.
// This allows mid-generation concurrency reactivity on the next step.
const userEvent = createEvent({
invocationId: newInvocationContextId(),
author: 'user',
content: newMessage,
});

if (params.runConfig?.saveInputBlobsAsArtifacts) {
await this.saveArtifacts(
userEvent.invocationId,
params.userId,
params.sessionId,
newMessage,
);
}

await this.sessionService.appendEvent({
session,
event: userEvent,
});

outputQueue.push(userEvent); // Echo it out as an event

// 2. Trigger the agent loop if no active generators are handling the session.
if (activeAgentGenerators === 0) {
runAgent();
} else {
// Let the active generator know it needs to run again if it exits soon.
pendingRun = true;
}
}
} catch (e) {
outputQueue.close(e as Error);
} finally {
outputQueue.close();
}
};

// Start consuming
consumeInput();

// Stream multiplexed output
yield* outputQueue;
}
// TODO - b/425992518: Implement runLive and related methods.
}

class AsyncEventQueue<T> {
private queue: T[] = [];
private resolves: ((val: IteratorResult<T>) => void)[] = [];
private isClosed = false;
private error?: Error;

push(value: T) {
if (this.resolves.length > 0) {
this.resolves.shift()!({value, done: false});
} else {
this.queue.push(value);
}
}

close(error?: Error) {
this.isClosed = true;
this.error = error;
while (this.resolves.length > 0) {
if (error) {
this.resolves.shift()!(Promise.reject(error));
} else {
this.resolves.shift()!({value: undefined, done: true});
}
}
}

async *[Symbol.asyncIterator](): AsyncGenerator<T, void, undefined> {
while (true) {
if (this.queue.length > 0) {
yield this.queue.shift()!;
} else if (this.isClosed) {
if (this.error) throw this.error;
return;
} else {
const result = await new Promise<IteratorResult<T>>((resolve) => {
this.resolves.push(resolve);
});
if (result.done) return;
yield result.value;
}
}
}
}

/**
* It iterates through the events in reverse order, and returns the event
* containing a function call with a functionCall.id matching the
Expand Down
76 changes: 76 additions & 0 deletions core/test/runner/streaming_runner_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,82 @@ describe('Runner Streaming and Ephemeral', () => {
});
});

describe('runStream', () => {
it('should multiplex input stream and agent events', async () => {
let resolveInput: (val: any) => void;
let inputPromise = new Promise((resolve) => {
resolveInput = resolve;
});

const inputStream = {
loopActive: true,
push: (val: string) => {
resolveInput(val);
},
close: () => {
resolveInput(null);
},
async *[Symbol.asyncIterator]() {
while (this.loopActive) {
const val = await inputPromise;
if (val === null) break; // End of stream
yield {role: 'user', parts: [{text: val}]};
inputPromise = new Promise((resolve) => {
resolveInput = resolve;
});
}
},
};

const session = await sessionService.createSession({
appName: TEST_APP_ID,
userId: TEST_USER_ID,
});

const generator = runner.runStream({
userId: TEST_USER_ID,
sessionId: session.id,
inputStream: inputStream as any,
});

const events: Event[] = [];
const consumeEvents = async () => {
for await (const event of generator) {
events.push(event);
}
};

// Start consuming events
const consumePromise = consumeEvents();

// Push first message
inputStream.push('Hello 1');

// Give it a moment to process first message
await new Promise((r) => setTimeout(r, 50));

// Push second message
inputStream.push('Hello 2');

// Close stream
await new Promise((r) => setTimeout(r, 50));
inputStream.close();

await consumePromise;

// We should see user events and model events mixed in the output
expect(events.length).toBeGreaterThan(0);
const userEvents = events.filter(
(e) =>
e.author === 'user' && e.content?.parts?.[0]?.text?.includes('Hello'),
);
expect(userEvents.length).toBe(2);

const modelEvents = events.filter((e) => e.author === rootAgent.name);
expect(modelEvents.length).toBeGreaterThan(0);
});
});

describe('toStructuredEvents', () => {
it('should convert error events', () => {
const event = createEvent({
Expand Down
Loading
Loading