diff --git a/src/app/components/chat/chat.component.html b/src/app/components/chat/chat.component.html
index b32c494b..4ede2b8c 100644
--- a/src/app/components/chat/chat.component.html
+++ b/src/app/components/chat/chat.component.html
@@ -202,11 +202,23 @@
class="example-margin"
[checked]="enableSseIndicator()"
(change)="toggleSse()"
- [disabled]="!(isTokenStreamingEnabledObs | async)"
+ [disabled]="!(isTokenStreamingEnabledObs | async) || useLiveStreaming"
>
{{ i18n.tokenStreamingLabel }}
+ @if (isLiveStreamingEnabledObs | async) {
+
+
+ {{ i18n.liveStreamingLabel }}
+
+
+ }
{
expect(combinedJson.data).toEqual([a2ui1, a2ui2]);
});
});
+
+ describe('Live Streaming Toggle', () => {
+ beforeEach(async () => {
+ mockFeatureFlagService.isLiveStreamingEnabledResponse.next(true);
+ await fixture.whenStable();
+ fixture.detectChanges();
+ });
+
+ describe('toggleLiveStreaming', () => {
+ it('should start WebSocket connection when enabling live streaming',
+ () => {
+ component.appName = TEST_APP_1_NAME;
+ component.userId = USER_ID;
+ component.sessionId = SESSION_1_ID;
+ component.useLiveStreaming = false;
+
+ component.toggleLiveStreaming();
+
+ expect(component.useLiveStreaming).toBeTrue();
+ expect(mockStreamChatService.startTextStreaming)
+ .toHaveBeenCalledWith({
+ appName: TEST_APP_1_NAME,
+ userId: USER_ID,
+ sessionId: SESSION_1_ID,
+ });
+ expect(mockStreamChatService.getTextMessages).toHaveBeenCalled();
+ });
+
+ it('should disable SSE when enabling live streaming', () => {
+ component.useSse = true;
+ component.useLiveStreaming = false;
+
+ component.toggleLiveStreaming();
+
+ expect(component.useSse).toBeFalse();
+ expect(component.useLiveStreaming).toBeTrue();
+ });
+
+ it('should stop WebSocket connection when disabling live streaming',
+ () => {
+ component.useLiveStreaming = true;
+ mockStreamChatService.startTextStreaming.calls.reset();
+
+ component.toggleLiveStreaming();
+
+ expect(component.useLiveStreaming).toBeFalse();
+ expect(mockStreamChatService.stopTextStreaming).toHaveBeenCalled();
+ });
+
+ it('should show warning when toggling in session with previous bidi usage',
+ () => {
+ component.useLiveStreaming = false;
+ component.sessionId = SESSION_1_ID;
+ (component as any).sessionHasUsedBidi.add(SESSION_1_ID);
+
+ component.toggleLiveStreaming();
+
+ expect(mockSnackBar.open).toHaveBeenCalledWith(
+ jasmine.stringContaining('Restarting'),
+ 'OK',
+ );
+ // Should toggle state but not start connection
+ expect(component.useLiveStreaming).toBeTrue();
+ expect(mockStreamChatService.startTextStreaming)
+ .not.toHaveBeenCalled();
+ });
+ });
+
+ describe('toggleSse', () => {
+ it('should disable live streaming when enabling SSE', () => {
+ component.useSse = false;
+ component.useLiveStreaming = true;
+
+ component.toggleSse();
+
+ expect(component.useSse).toBeTrue();
+ expect(component.useLiveStreaming).toBeFalse();
+ expect(mockStreamChatService.stopTextStreaming).toHaveBeenCalled();
+ });
+
+ it('should not affect live streaming when disabling SSE', () => {
+ component.useSse = true;
+ component.useLiveStreaming = false;
+
+ component.toggleSse();
+
+ expect(component.useSse).toBeFalse();
+ expect(component.useLiveStreaming).toBeFalse();
+ expect(mockStreamChatService.stopTextStreaming)
+ .not.toHaveBeenCalled();
+ });
+ });
+
+ describe('sendMessage with live streaming', () => {
+ it('should send message via WebSocket when live streaming is active',
+ async () => {
+ component.useLiveStreaming = true;
+ component.userInput = TEST_MESSAGE;
+ const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});
+
+ await component.sendMessage(mockEvent);
+
+ expect(mockStreamChatService.sendTextMessage)
+ .toHaveBeenCalledWith(TEST_MESSAGE);
+ expect(component.userInput).toBe('');
+ expect(mockAgentService.runSse).not.toHaveBeenCalled();
+ });
+
+ it('should clear selected files when sending via live streaming', async () => {
+ component.useLiveStreaming = true;
+ component.userInput = TEST_MESSAGE;
+ const mockFile = new File(['test'], TEST_FILE_NAME);
+ component.selectedFiles = [{file: mockFile, url: 'blob:test'}];
+ const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});
+
+ await component.sendMessage(mockEvent);
+
+ expect(component.selectedFiles).toEqual([]);
+ expect(mockStreamChatService.sendTextMessage).toHaveBeenCalled();
+ });
+
+ it('should use normal flow when live streaming is disabled', async () => {
+ component.useLiveStreaming = false;
+ component.useSse = false;
+ component.userInput = TEST_MESSAGE;
+ const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});
+
+ await component.sendMessage(mockEvent);
+
+ expect(mockStreamChatService.sendTextMessage).not.toHaveBeenCalled();
+ expect(mockAgentService.runSse).toHaveBeenCalled();
+ });
+ });
+ });
});
diff --git a/src/app/components/chat/chat.component.ts b/src/app/components/chat/chat.component.ts
index 4dc627cb..147b4cdc 100644
--- a/src/app/components/chat/chat.component.ts
+++ b/src/app/components/chat/chat.component.ts
@@ -214,6 +214,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
showSidePanel = true;
showBuilderAssistant = true;
useSse = false;
+ useLiveStreaming = false;
currentSessionState: SessionState|undefined = {};
root_agent = ROOT_AGENT;
updatedSessionState: WritableSignal = signal(null);
@@ -291,6 +292,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
this.featureFlagService.isApplicationSelectorEnabled();
readonly isTokenStreamingEnabledObs: Observable =
this.featureFlagService.isTokenStreamingEnabled();
+ readonly isLiveStreamingEnabledObs: Observable =
+ this.featureFlagService.isLiveStreamingEnabled();
readonly isExportSessionEnabledObs: Observable =
this.featureFlagService.isExportSessionEnabled();
readonly isEventFilteringEnabled =
@@ -537,6 +540,15 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
userMessage.text = this.userInput;
}
+ if (this.useLiveStreaming) {
+ const messageText = this.userInput;
+ this.userInput = '';
+ this.selectedFiles = [];
+ this.streamChatService.sendTextMessage(messageText);
+ this.changeDetectorRef.detectChanges();
+ return;
+ }
+
// Add user message attachments
if (this.selectedFiles.length > 0) {
const messageAttachments = this.selectedFiles.map((file) => ({
@@ -680,7 +692,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
chunkJson.groundingMetadata.searchEntryPoint.renderedContent;
}
- if (!this.useSse) {
+ if (!this.useSse && !this.useLiveStreaming) {
this.insertMessageBeforeLoadingMessage(this.streamingTextMessage);
this.storeEvents(part, chunkJson);
this.streamingTextMessage = null;
@@ -1915,6 +1927,77 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
toggleSse() {
this.useSse = !this.useSse;
+ // If enabling SSE, disable live streaming
+ if (this.useSse && this.useLiveStreaming) {
+ this.useLiveStreaming = false;
+ this.streamChatService.stopTextStreaming();
+ }
+ }
+
+ toggleLiveStreaming() {
+ this.useLiveStreaming = !this.useLiveStreaming;
+ if (this.useLiveStreaming && this.sessionHasUsedBidi.has(this.sessionId)) {
+ this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK');
+ return;
+ }
+
+ if (this.useLiveStreaming && this.useSse) {
+ this.useSse = false;
+ }
+
+ if (this.useLiveStreaming) {
+ this.streamChatService.startTextStreaming({
+ appName: this.appName,
+ userId: this.userId,
+ sessionId: this.sessionId,
+ });
+
+ this.streamChatService.getTextMessages().subscribe({
+ next: (event: any) => {
+
+ const hasAudioBlob = (event: any) => {
+ if (event.content?.parts) {
+ const hasAudioBlob = event.content.parts.some((part: any) =>
+ part.inlineData?.mimeType?.startsWith('audio/')
+ );
+ return hasAudioBlob
+ }
+ return false;
+ }
+
+ // Process incoming events similar to SSE
+ if (event.error) {
+ this.openSnackBar(event.error, 'OK');
+ return;
+ }
+ if (!hasAudioBlob(event) && event.content) {
+ let parts = this.combineTextParts(event.content.parts);
+ if (this.isEventA2aResponse(event)) {
+ parts = this.combineA2uiDataParts(parts);
+ }
+
+ for (let part of parts) {
+ this.processPart(event, part);
+ this.traceService.setEventData(this.eventData);
+ }
+ } else if (event.errorMessage) {
+ this.processErrorMessage(event);
+ }
+ if (event.actions) {
+ this.processActionArtifact(event);
+ this.processActionStateDelta(event);
+ }
+ this.changeDetectorRef.detectChanges();
+ },
+ error: (err: any) => {
+ console.error('Live streaming error:', err);
+ this.openSnackBar(err, 'OK');
+ },
+ });
+ this.sessionHasUsedBidi.add(this.sessionId);
+ } else {
+ this.streamChatService.stopTextStreaming();
+ }
}
enterBuilderMode() {
diff --git a/src/app/core/services/feature-flag.service.ts b/src/app/core/services/feature-flag.service.ts
index e7906342..4f743471 100644
--- a/src/app/core/services/feature-flag.service.ts
+++ b/src/app/core/services/feature-flag.service.ts
@@ -75,6 +75,10 @@ export class FeatureFlagService implements FeatureFlagServiceInterface {
return of(true);
}
+ isLiveStreamingEnabled(): Observable {
+ return of(true);
+ }
+
isMessageFileUploadEnabled(): Observable {
return of(true);
}
diff --git a/src/app/core/services/interfaces/feature-flag.ts b/src/app/core/services/interfaces/feature-flag.ts
index 45ef7e81..5f5d373c 100644
--- a/src/app/core/services/interfaces/feature-flag.ts
+++ b/src/app/core/services/interfaces/feature-flag.ts
@@ -39,6 +39,7 @@ export declare abstract class FeatureFlagService {
abstract isArtifactsTabEnabled(): Observable;
abstract isEvalEnabled(): Observable;
abstract isTokenStreamingEnabled(): Observable;
+ abstract isLiveStreamingEnabled(): Observable;
abstract isMessageFileUploadEnabled(): Observable;
abstract isManualStateUpdateEnabled(): Observable;
abstract isBidiStreamingEnabled(): Observable;
diff --git a/src/app/core/services/interfaces/stream-chat.ts b/src/app/core/services/interfaces/stream-chat.ts
index 4237e185..860fd8ed 100644
--- a/src/app/core/services/interfaces/stream-chat.ts
+++ b/src/app/core/services/interfaces/stream-chat.ts
@@ -22,7 +22,7 @@ export const STREAM_CHAT_SERVICE =
new InjectionToken('StreamChatService');
/**
- * Service for supporting live streaming with audio/video.
+ * Service for supporting live streaming with audio/video/text.
*/
export declare abstract class StreamChatService {
abstract startAudioChat(options: {
@@ -38,6 +38,14 @@ export declare abstract class StreamChatService {
videoContainer: ElementRef;
}): Promise;
abstract stopVideoChat(videoContainer: ElementRef): void;
+ abstract startTextStreaming(options: {
+ appName: string;
+ userId: string;
+ sessionId: string;
+ }): void;
+ abstract stopTextStreaming(): void;
+ abstract sendTextMessage(text: string): void;
+ abstract getTextMessages(): Observable;
abstract onStreamClose(): Observable;
abstract closeStream(): void;
}
diff --git a/src/app/core/services/stream-chat.service.spec.ts b/src/app/core/services/stream-chat.service.spec.ts
index d943876e..281de1f1 100644
--- a/src/app/core/services/stream-chat.service.spec.ts
+++ b/src/app/core/services/stream-chat.service.spec.ts
@@ -18,6 +18,7 @@
import {ElementRef} from '@angular/core';
import {TestBed} from '@angular/core/testing';
// 1p-ONLY-IMPORTS: import {beforeEach, describe, expect, it,}
+import {firstValueFrom, Subject, toArray} from 'rxjs';
import {URLUtil} from '../../../utils/url-util';
import {fakeAsync,
@@ -31,6 +32,7 @@ import {MockVideoService} from './testing/mock-video.service';
import {MockWebSocketService} from './testing/mock-websocket.service';
import {VIDEO_SERVICE} from './interfaces/video';
import {WEBSOCKET_SERVICE} from './interfaces/websocket';
+import {createFakeLlmResponse} from '../models/testing/fake_genai_types';
describe('StreamChatService', () => {
let service: StreamChatService;
@@ -44,6 +46,7 @@ describe('StreamChatService', () => {
initTestBed(); // required for 1p compat
spyOn(URLUtil, 'getWSServerUrl').and.returnValue('localhost:9876');
mockWebSocketService = new MockWebSocketService();
+ (mockWebSocketService as any).socket$ = new Subject();
mockAudioRecordingService = new MockAudioRecordingService();
mockVideoService = new MockVideoService();
videoContainer = new ElementRef(document.createElement('div'));
@@ -263,4 +266,120 @@ describe('StreamChatService', () => {
expect(mockWebSocketService.sendMessage).toHaveBeenCalledTimes(2);
}));
});
+
+ describe('Text Streaming', () => {
+ const TEXT_STREAMING_CONFIG = {
+ appName: 'test-app',
+ userId: 'test-user',
+ sessionId: 'test-session'
+ };
+
+ describe('startTextStreaming', () => {
+ it('should connect to WebSocket with correct URL', () => {
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+
+ expect(mockWebSocketService.connect)
+ .toHaveBeenCalledWith(
+ 'ws://localhost:9876/run_live?app_name=test-app&user_id=test-user&session_id=test-session');
+ });
+
+ it('should subscribe to WebSocket messages', () => {
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+
+ expect(mockWebSocketService.getMessages).toHaveBeenCalled();
+ });
+
+ it('should parse and emit incoming JSON messages', () => {
+ const fakeResponse = createFakeLlmResponse();
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+
+ let receivedMessage: any;
+ service.getTextMessages().subscribe((message) => {
+ receivedMessage = message;
+ });
+
+ mockWebSocketService.getMessagesResponse.next(
+ JSON.stringify(fakeResponse));
+
+ expect(receivedMessage).toEqual(fakeResponse);
+ });
+ });
+
+ describe('stopTextStreaming', () => {
+ it('should close WebSocket connection', () => {
+ service.stopTextStreaming();
+
+ expect(mockWebSocketService.closeConnection).toHaveBeenCalled();
+ });
+ });
+
+ describe('sendTextMessage', () => {
+ it('should format message in LiveRequest structure', () => {
+ const socketSpy = spyOn(
+ (mockWebSocketService as any).socket$, 'next');
+
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+ service.sendTextMessage('Hello world');
+
+ expect(socketSpy).toHaveBeenCalledWith({
+ content: {
+ parts: [{text: 'Hello world'}]
+ }
+ });
+ });
+
+ it('should log error when streaming is not active', () => {
+ spyOn(console, 'error');
+
+ service.sendTextMessage('test');
+
+ expect(console.error)
+ .toHaveBeenCalledWith('Text streaming is not active');
+ });
+ });
+
+ describe('getTextMessages', () => {
+ it('should emit multiple messages from WebSocket stream', () => {
+ const fakeResponse1 = createFakeLlmResponse();
+ const fakeResponse2 = createFakeLlmResponse({
+ content: {role: 'model', parts: [{text: 'fake response 2'}]},
+ });
+
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+
+ const receivedMessages: any[] = [];
+ service.getTextMessages().subscribe((message) => {
+ receivedMessages.push(message);
+ });
+
+ mockWebSocketService.getMessagesResponse.next(
+ JSON.stringify(fakeResponse1));
+ mockWebSocketService.getMessagesResponse.next(
+ JSON.stringify(fakeResponse2));
+
+ expect(receivedMessages).toEqual([fakeResponse1, fakeResponse2]);
+ });
+
+ it('should handle JSON parsing errors gracefully', () => {
+ spyOn(console, 'error');
+
+ service.startTextStreaming(TEXT_STREAMING_CONFIG);
+
+ let errorThrown = false;
+ service.getTextMessages().subscribe({
+ next: () => {},
+ error: () => {
+ errorThrown = true;
+ }
+ });
+
+ mockWebSocketService.getMessagesResponse.next('invalid-json');
+
+ expect(console.error)
+ .toHaveBeenCalledWith(
+ 'Error parsing WebSocket message:', jasmine.any(Error));
+ expect(errorThrown).toBeFalse();
+ });
+ });
+ });
});
diff --git a/src/app/core/services/stream-chat.service.ts b/src/app/core/services/stream-chat.service.ts
index e01aaf08..8b9522db 100644
--- a/src/app/core/services/stream-chat.service.ts
+++ b/src/app/core/services/stream-chat.service.ts
@@ -16,6 +16,7 @@
*/
import {ElementRef, inject, Injectable} from '@angular/core';
+import {Observable, Subject} from 'rxjs';
import {URLUtil} from '../../../utils/url-util';
import {LiveRequest} from '../models/LiveRequest';
@@ -28,7 +29,7 @@ import {VideoService} from './video.service';
import {WebSocketService} from './websocket.service';
/**
- * Service for supporting live streaming with audio/video.
+ * Service for supporting live streaming with audio/video/text.
*/
@Injectable({
providedIn: 'root',
@@ -39,6 +40,8 @@ export class StreamChatService implements StreamChatServiceInterface {
private readonly webSocketService = inject(WEBSOCKET_SERVICE);
private audioIntervalId: number|undefined = undefined;
private videoIntervalId: number|undefined = undefined;
+ private textMessagesSubject = new Subject();
+ private isTextStreamingActive = false;
constructor() {}
@@ -146,11 +149,66 @@ export class StreamChatService implements StreamChatServiceInterface {
this.videoService.stopRecording(videoContainer);
}
+ startTextStreaming({
+ appName,
+ userId,
+ sessionId,
+ }: {appName: string; userId: string; sessionId: string;}) {
+ const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
+ this.webSocketService.connect(
+ `${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${
+ appName}&user_id=${userId}&session_id=${sessionId}`,
+ );
+ this.isTextStreamingActive = true;
+
+ // Subscribe to incoming WebSocket messages and forward to text stream
+ this.webSocketService.getMessages().subscribe((message) => {
+ try {
+ const parsedMessage = JSON.parse(message);
+ this.textMessagesSubject.next(parsedMessage);
+ } catch (error) {
+ console.error('Error parsing WebSocket message:', error);
+ }
+ });
+ }
+
+ stopTextStreaming() {
+ this.isTextStreamingActive = false;
+ this.webSocketService.closeConnection();
+ }
+
+ sendTextMessage(text: string) {
+ if (!this.isTextStreamingActive) {
+ console.error('Text streaming is not active');
+ return;
+ }
+
+ // Send text message in LiveRequest format expected by the backend
+ // Based on LiveRequest model: content field with types.Content structure
+ const liveRequest = {
+ content: {
+ parts: [
+ {
+ text: text
+ }
+ ]
+ }
+ };
+
+ // Use WebSocket's underlying socket to send raw JSON
+ (this.webSocketService as any).socket$.next(liveRequest);
+ }
+
+ getTextMessages(): Observable {
+ return this.textMessagesSubject.asObservable();
+ }
+
onStreamClose() {
return this.webSocketService.onCloseReason();
}
closeStream() {
+ this.isTextStreamingActive = false;
this.webSocketService.closeConnection();
}
}
diff --git a/src/app/core/services/testing/mock-feature-flag.service.ts b/src/app/core/services/testing/mock-feature-flag.service.ts
index 5e8cfb82..f80caf0e 100644
--- a/src/app/core/services/testing/mock-feature-flag.service.ts
+++ b/src/app/core/services/testing/mock-feature-flag.service.ts
@@ -58,6 +58,10 @@ export class MockFeatureFlagService implements FeatureFlagService {
isTokenStreamingEnabled =
jasmine.createSpy('isTokenStreamingEnabled')
.and.returnValue(this.isTokenStreamingEnabledResponse);
+ isLiveStreamingEnabledResponse = new ReplaySubject(1);
+ isLiveStreamingEnabled =
+ jasmine.createSpy('isLiveStreamingEnabled')
+ .and.returnValue(this.isLiveStreamingEnabledResponse);
isMessageFileUploadEnabledResponse = new ReplaySubject(1);
isMessageFileUploadEnabled =
jasmine.createSpy('isMessageFileUploadEnabled')
diff --git a/src/app/core/services/testing/mock-stream-chat.service.ts b/src/app/core/services/testing/mock-stream-chat.service.ts
index 086152da..529e6578 100644
--- a/src/app/core/services/testing/mock-stream-chat.service.ts
+++ b/src/app/core/services/testing/mock-stream-chat.service.ts
@@ -26,6 +26,12 @@ export class MockStreamChatService implements Partial {
stopAudioChat = jasmine.createSpy('stopAudioChat');
startVideoChat = jasmine.createSpy('startVideoChat');
stopVideoChat = jasmine.createSpy('stopVideoChat');
+ startTextStreaming = jasmine.createSpy('startTextStreaming');
+ stopTextStreaming = jasmine.createSpy('stopTextStreaming');
+ sendTextMessage = jasmine.createSpy('sendTextMessage');
+ textMessagesResponse = new ReplaySubject(1);
+ getTextMessages = jasmine.createSpy('getTextMessages')
+ .and.returnValue(this.textMessagesResponse);
closeStream = jasmine.createSpy('closeStream');
onStreamCloseResponse = new ReplaySubject(1);
onStreamClose = jasmine.createSpy('onStreamClose')