Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/app/components/chat/chat.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,23 @@
class="example-margin"
[checked]="enableSseIndicator()"
(change)="toggleSse()"
[disabled]="!(isTokenStreamingEnabledObs | async)"
[disabled]="!(isTokenStreamingEnabledObs | async) || useLiveStreaming"
>
{{ i18n.tokenStreamingLabel }}
</mat-slide-toggle>
</div>
@if (isLiveStreamingEnabledObs | async) {
<div class="toolbar-sse-toggle" style="margin-left: 8px;">
<mat-slide-toggle
class="example-margin"
[checked]="useLiveStreaming"
(change)="toggleLiveStreaming()"
[disabled]="useSse"
>
{{ i18n.liveStreamingLabel }}
</mat-slide-toggle>
</div>
}
<mat-divider
[vertical]="true"
style="margin-left: 8px; margin-right: 8px; height: 22px"
Expand Down
1 change: 1 addition & 0 deletions src/app/components/chat/chat.component.i18n.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const CHAT_MESSAGES = {
userIdLabel: 'User ID',
loadingSessionLabel: 'Loading session...',
tokenStreamingLabel: 'Token Streaming',
liveStreamingLabel: 'Live Streaming',
createNewSessionTooltip: 'Create a new Session',
newSessionButton: 'New Session',
deleteSessionTooltip: 'Delete current session',
Expand Down
134 changes: 134 additions & 0 deletions src/app/components/chat/chat.component.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1748,4 +1748,138 @@ describe('ChatComponent', () => {
expect(combinedJson.data).toEqual([a2ui1, a2ui2]);
});
});

describe('Live Streaming Toggle', () => {
beforeEach(async () => {
mockFeatureFlagService.isLiveStreamingEnabledResponse.next(true);
await fixture.whenStable();
fixture.detectChanges();
});

describe('toggleLiveStreaming', () => {
it('should start WebSocket connection when enabling live streaming',
() => {
component.appName = TEST_APP_1_NAME;
component.userId = USER_ID;
component.sessionId = SESSION_1_ID;
component.useLiveStreaming = false;

component.toggleLiveStreaming();

expect(component.useLiveStreaming).toBeTrue();
expect(mockStreamChatService.startTextStreaming)
.toHaveBeenCalledWith({
appName: TEST_APP_1_NAME,
userId: USER_ID,
sessionId: SESSION_1_ID,
});
expect(mockStreamChatService.getTextMessages).toHaveBeenCalled();
});

it('should disable SSE when enabling live streaming', () => {
component.useSse = true;
component.useLiveStreaming = false;

component.toggleLiveStreaming();

expect(component.useSse).toBeFalse();
expect(component.useLiveStreaming).toBeTrue();
});

it('should stop WebSocket connection when disabling live streaming',
() => {
component.useLiveStreaming = true;
mockStreamChatService.startTextStreaming.calls.reset();

component.toggleLiveStreaming();

expect(component.useLiveStreaming).toBeFalse();
expect(mockStreamChatService.stopTextStreaming).toHaveBeenCalled();
});

it('should show warning when toggling in session with previous bidi usage',
() => {
component.useLiveStreaming = false;
component.sessionId = SESSION_1_ID;
(component as any).sessionHasUsedBidi.add(SESSION_1_ID);

component.toggleLiveStreaming();

expect(mockSnackBar.open).toHaveBeenCalledWith(
jasmine.stringContaining('Restarting'),
'OK',
);
// Should toggle state but not start connection
expect(component.useLiveStreaming).toBeTrue();
expect(mockStreamChatService.startTextStreaming)
.not.toHaveBeenCalled();
});
});

describe('toggleSse', () => {
it('should disable live streaming when enabling SSE', () => {
component.useSse = false;
component.useLiveStreaming = true;

component.toggleSse();

expect(component.useSse).toBeTrue();
expect(component.useLiveStreaming).toBeFalse();
expect(mockStreamChatService.stopTextStreaming).toHaveBeenCalled();
});

it('should not affect live streaming when disabling SSE', () => {
component.useSse = true;
component.useLiveStreaming = false;

component.toggleSse();

expect(component.useSse).toBeFalse();
expect(component.useLiveStreaming).toBeFalse();
expect(mockStreamChatService.stopTextStreaming)
.not.toHaveBeenCalled();
});
});

describe('sendMessage with live streaming', () => {
it('should send message via WebSocket when live streaming is active',
async () => {
component.useLiveStreaming = true;
component.userInput = TEST_MESSAGE;
const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});

await component.sendMessage(mockEvent);

expect(mockStreamChatService.sendTextMessage)
.toHaveBeenCalledWith(TEST_MESSAGE);
expect(component.userInput).toBe('');
expect(mockAgentService.runSse).not.toHaveBeenCalled();
});

it('should clear selected files when sending via live streaming', async () => {
component.useLiveStreaming = true;
component.userInput = TEST_MESSAGE;
const mockFile = new File(['test'], TEST_FILE_NAME);
component.selectedFiles = [{file: mockFile, url: 'blob:test'}];
const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});

await component.sendMessage(mockEvent);

expect(component.selectedFiles).toEqual([]);
expect(mockStreamChatService.sendTextMessage).toHaveBeenCalled();
});

it('should use normal flow when live streaming is disabled', async () => {
component.useLiveStreaming = false;
component.useSse = false;
component.userInput = TEST_MESSAGE;
const mockEvent = new KeyboardEvent('keydown', {key: 'Enter'});

await component.sendMessage(mockEvent);

expect(mockStreamChatService.sendTextMessage).not.toHaveBeenCalled();
expect(mockAgentService.runSse).toHaveBeenCalled();
});
});
});
});
85 changes: 84 additions & 1 deletion src/app/components/chat/chat.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
showSidePanel = true;
showBuilderAssistant = true;
useSse = false;
useLiveStreaming = false;
currentSessionState: SessionState|undefined = {};
root_agent = ROOT_AGENT;
updatedSessionState: WritableSignal<any> = signal(null);
Expand Down Expand Up @@ -291,6 +292,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
this.featureFlagService.isApplicationSelectorEnabled();
readonly isTokenStreamingEnabledObs: Observable<boolean> =
this.featureFlagService.isTokenStreamingEnabled();
readonly isLiveStreamingEnabledObs: Observable<boolean> =
this.featureFlagService.isLiveStreamingEnabled();
readonly isExportSessionEnabledObs: Observable<boolean> =
this.featureFlagService.isExportSessionEnabled();
readonly isEventFilteringEnabled =
Expand Down Expand Up @@ -537,6 +540,15 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
userMessage.text = this.userInput;
}

if (this.useLiveStreaming) {
const messageText = this.userInput;
this.userInput = '';
this.selectedFiles = [];
this.streamChatService.sendTextMessage(messageText);
this.changeDetectorRef.detectChanges();
return;
}

// Add user message attachments
if (this.selectedFiles.length > 0) {
const messageAttachments = this.selectedFiles.map((file) => ({
Expand Down Expand Up @@ -680,7 +692,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
chunkJson.groundingMetadata.searchEntryPoint.renderedContent;
}

if (!this.useSse) {
if (!this.useSse && !this.useLiveStreaming) {
this.insertMessageBeforeLoadingMessage(this.streamingTextMessage);
this.storeEvents(part, chunkJson);
this.streamingTextMessage = null;
Expand Down Expand Up @@ -1915,6 +1927,77 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {

toggleSse() {
this.useSse = !this.useSse;
// If enabling SSE, disable live streaming
if (this.useSse && this.useLiveStreaming) {
this.useLiveStreaming = false;
this.streamChatService.stopTextStreaming();
}
}

toggleLiveStreaming() {
this.useLiveStreaming = !this.useLiveStreaming;
if (this.useLiveStreaming && this.sessionHasUsedBidi.has(this.sessionId)) {
this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK');
return;
}

if (this.useLiveStreaming && this.useSse) {
this.useSse = false;
}

if (this.useLiveStreaming) {
this.streamChatService.startTextStreaming({
appName: this.appName,
userId: this.userId,
sessionId: this.sessionId,
});

this.streamChatService.getTextMessages().subscribe({
next: (event: any) => {

const hasAudioBlob = (event: any) => {
if (event.content?.parts) {
const hasAudioBlob = event.content.parts.some((part: any) =>
part.inlineData?.mimeType?.startsWith('audio/')
);
return hasAudioBlob
}
return false;
}

// Process incoming events similar to SSE
if (event.error) {
this.openSnackBar(event.error, 'OK');
return;
}
if (!hasAudioBlob(event) && event.content) {
let parts = this.combineTextParts(event.content.parts);
if (this.isEventA2aResponse(event)) {
parts = this.combineA2uiDataParts(parts);
}

for (let part of parts) {
this.processPart(event, part);
this.traceService.setEventData(this.eventData);
}
} else if (event.errorMessage) {
this.processErrorMessage(event);
}
if (event.actions) {
this.processActionArtifact(event);
this.processActionStateDelta(event);
}
this.changeDetectorRef.detectChanges();
},
error: (err: any) => {
console.error('Live streaming error:', err);
this.openSnackBar(err, 'OK');
},
});
this.sessionHasUsedBidi.add(this.sessionId);
} else {
this.streamChatService.stopTextStreaming();
}
}

enterBuilderMode() {
Expand Down
4 changes: 4 additions & 0 deletions src/app/core/services/feature-flag.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ export class FeatureFlagService implements FeatureFlagServiceInterface {
return of(true);
}

isLiveStreamingEnabled(): Observable<boolean> {
return of(true);
}

isMessageFileUploadEnabled(): Observable<boolean> {
return of(true);
}
Expand Down
1 change: 1 addition & 0 deletions src/app/core/services/interfaces/feature-flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export declare abstract class FeatureFlagService {
abstract isArtifactsTabEnabled(): Observable<boolean>;
abstract isEvalEnabled(): Observable<boolean>;
abstract isTokenStreamingEnabled(): Observable<boolean>;
abstract isLiveStreamingEnabled(): Observable<boolean>;
abstract isMessageFileUploadEnabled(): Observable<boolean>;
abstract isManualStateUpdateEnabled(): Observable<boolean>;
abstract isBidiStreamingEnabled(): Observable<boolean>;
Expand Down
10 changes: 9 additions & 1 deletion src/app/core/services/interfaces/stream-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const STREAM_CHAT_SERVICE =
new InjectionToken<StreamChatService>('StreamChatService');

/**
* Service for supporting live streaming with audio/video.
* Service for supporting live streaming with audio/video/text.
*/
export declare abstract class StreamChatService {
abstract startAudioChat(options: {
Expand All @@ -38,6 +38,14 @@ export declare abstract class StreamChatService {
videoContainer: ElementRef;
}): Promise<void>;
abstract stopVideoChat(videoContainer: ElementRef): void;
abstract startTextStreaming(options: {
appName: string;
userId: string;
sessionId: string;
}): void;
abstract stopTextStreaming(): void;
abstract sendTextMessage(text: string): void;
abstract getTextMessages(): Observable<any>;
abstract onStreamClose(): Observable<string>;
abstract closeStream(): void;
}
Loading