diff --git a/src/app/components/chat-panel/chat-panel.component.html b/src/app/components/chat-panel/chat-panel.component.html index 9cd2958a..ab8a74c3 100644 --- a/src/app/components/chat-panel/chat-panel.component.html +++ b/src/app/components/chat-panel/chat-panel.component.html @@ -166,6 +166,7 @@ alt="image" [src]="message.inlineData.data" (click)="openViewImageDialog.emit(message.inlineData.data)" + (load)="scrollToBottom()" /> } @case (MediaType.AUDIO) { @@ -470,7 +471,7 @@ class="audio-rec-btn" [class.recording]="isAudioRecording" [matTooltip]="isAudioRecording ? i18n.turnOffMicTooltip : i18n.useMicTooltip" - [disabled]="!(isBidiStreamingEnabledObs | async)" + [disabled]="!(isBidiStreamingEnabledObs | async) || !useLive" > mic @@ -481,7 +482,7 @@ class="video-rec-btn" [class.recording]="isVideoRecording" [matTooltip]="isVideoRecording ? i18n.turnOffCamTooltip : i18n.useCamTooltip" - [disabled]="!(isBidiStreamingEnabledObs | async)" + [disabled]="!(isBidiStreamingEnabledObs | async) || !useLive" > videocam diff --git a/src/app/components/chat-panel/chat-panel.component.ts b/src/app/components/chat-panel/chat-panel.component.ts index 37a81977..04ca2850 100644 --- a/src/app/components/chat-panel/chat-panel.component.ts +++ b/src/app/components/chat-panel/chat-panel.component.ts @@ -99,6 +99,7 @@ export class ChatPanelComponent implements OnChanges, AfterViewInit { @Input() updatedSessionState: any|null = null; @Input() eventData = new Map(); @Input() selectedEvent: any = undefined; + @Input() useLive: boolean = false; @Input() isAudioRecording: boolean = false; @Input() isVideoRecording: boolean = false; @Input() hoveredEventMessageIndices: number[] = []; diff --git a/src/app/components/chat/chat.component.html b/src/app/components/chat/chat.component.html index c8d277c7..4a2ed188 100644 --- a/src/app/components/chat/chat.component.html +++ b/src/app/components/chat/chat.component.html @@ -197,11 +197,21 @@ @if (isSessionLoading === false) {
-
+
+ {{ i18n.liveLabel }} + +
+
+ {{ i18n.tokenStreamingLabel }} @@ -293,6 +303,7 @@ [updatedSessionState]="updatedSessionState()" [eventData]="eventData" [selectedEvent]="selectedEvent" + [useLive]="useLive" [isAudioRecording]="isAudioRecording" [isVideoRecording]="isVideoRecording" [hoveredEventMessageIndices]="hoveredEventMessageIndices" diff --git a/src/app/components/chat/chat.component.i18n.ts b/src/app/components/chat/chat.component.i18n.ts index 29a6f67b..3830a1fa 100644 --- a/src/app/components/chat/chat.component.i18n.ts +++ b/src/app/components/chat/chat.component.i18n.ts @@ -31,6 +31,7 @@ export const CHAT_MESSAGES = { userIdLabel: 'User ID', loadingSessionLabel: 'Loading session...', tokenStreamingLabel: 'Token Streaming', + liveLabel: 'SSE / Live', createNewSessionTooltip: 'Create a new Session', newSessionButton: 'New Session', deleteSessionTooltip: 'Delete current session', diff --git a/src/app/components/chat/chat.component.scss b/src/app/components/chat/chat.component.scss index 99bdd920..6ccbc9db 100644 --- a/src/app/components/chat/chat.component.scss +++ b/src/app/components/chat/chat.component.scss @@ -536,7 +536,43 @@ button { align-items: center; } -.toolbar-sse-toggle { +.toolbar-live-toggle { + @include mat.slide-toggle-overrides( + ( + label-text-size: 14px, + label-text-color: var(--chat-toolbar-sse-toggle-label-text-color), + unselected-track-color: + var(--chat-toolbar-sse-toggle-unselected-track-color), + unselected-focus-track-color: + var(--chat-toolbar-sse-toggle-unselected-track-color), + unselected-hover-track-color: + var(--chat-toolbar-sse-toggle-unselected-track-color), + unselected-handle-color: + var(--chat-toolbar-sse-toggle-unselected-handle-color), + unselected-focus-handle-color: + var(--chat-toolbar-sse-toggle-unselected-handle-color), + unselected-hover-handle-color: + var(--chat-toolbar-sse-toggle-unselected-handle-color), + selected-track-color: var(--chat-toolbar-sse-toggle-selected-track-color), + selected-focus-track-color: + var(--chat-toolbar-sse-toggle-selected-track-color), + selected-hover-track-color: + var(--chat-toolbar-sse-toggle-selected-track-color), + selected-handle-color: + var(--chat-toolbar-sse-toggle-selected-handle-color), + selected-focus-handle-color: + var(--chat-toolbar-sse-toggle-selected-handle-color), + selected-hover-handle-color: + var(--chat-toolbar-sse-toggle-selected-handle-color), + track-height: 24px, + track-width: 46px, + track-outline-color: var(--chat-toolbar-sse-toggle-track-outline-color), + with-icon-handle-size: 20px, + ) + ); +} + +.toolbar-streaming-toggle { @include mat.slide-toggle-overrides( ( label-text-size: 14px, diff --git a/src/app/components/chat/chat.component.ts b/src/app/components/chat/chat.component.ts index e833fc55..f552edda 100644 --- a/src/app/components/chat/chat.component.ts +++ b/src/app/components/chat/chat.component.ts @@ -117,9 +117,6 @@ class CustomPaginatorIntl extends MatPaginatorIntl { }; } -const BIDI_STREAMING_RESTART_WARNING = - 'Restarting bidirectional streaming is not currently supported. Please refresh the page or start a new session.'; - @Component({ selector: 'app-chat', templateUrl: './chat.component.html', @@ -183,7 +180,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { sidePanel = viewChild.required(SidePanelComponent); evalTab = viewChild(EvalTabComponent); bottomPanelRef = viewChild.required('bottomPanel'); - enableSseIndicator = signal(false); + enableStreamingIndicator = signal(false); + enableLiveIndicator = signal(false); isChatMode = signal(true); isEvalCaseEditing = signal(false); hasEvalCaseChanged = signal(false); @@ -212,16 +210,14 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { redirectUri = URLUtil.getBaseUrlWithoutPath(); showSidePanel = true; showBuilderAssistant = true; - useSse = false; + useStreaming = false; + useLive = false; currentSessionState: SessionState|undefined = {}; root_agent = ROOT_AGENT; updatedSessionState: WritableSignal = signal(null); private readonly isModelThinkingSubject = new BehaviorSubject(false); protected readonly canEditSession = signal(true); - // TODO: Remove this once backend supports restarting bidi streaming. - sessionHasUsedBidi = new Set(); - eventData = new Map(); traceData: any[] = []; renderedEventGraph: SafeHtml|undefined; @@ -290,6 +286,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { this.featureFlagService.isApplicationSelectorEnabled(); readonly isTokenStreamingEnabledObs: Observable = this.featureFlagService.isTokenStreamingEnabled(); + readonly isBidiStreamingEnabledObs: Observable = + this.featureFlagService.isBidiStreamingEnabled(); readonly isExportSessionEnabledObs: Observable = this.featureFlagService.isExportSessionEnabled(); readonly isEventFilteringEnabled = @@ -423,6 +421,56 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { if (!this.isApplicationSelectorEnabled()) { this.loadSessionByUrlOrReset(); } + + this.streamChatService.getMessages().subscribe((chunkJson: AdkEvent) => this.handleAdkEvent(chunkJson)); + } + + async handleAdkEvent(chunkJson: AdkEvent) { + if (chunkJson.error) { + this.openSnackBar(chunkJson.error, 'OK'); + return; + } + if (chunkJson.inputTranscription && (!!chunkJson.partial === this.useStreaming)) { + chunkJson.content = { + role: 'user', + parts: [ + { + text: chunkJson.inputTranscription.text + } + ] + } + } + if (chunkJson.outputTranscription && (!!chunkJson.partial === this.useStreaming)) { + chunkJson.content = { + role: 'bot', + parts: [ + { + text: chunkJson.outputTranscription.text + } + ] + } + } + if (chunkJson.content && (!!chunkJson.partial === this.useStreaming)) { + let parts = this.combineTextParts(chunkJson.content.parts); + if (this.isEventA2aResponse(chunkJson)) { + parts = this.combineA2uiDataParts(parts); + } + + for (let part of parts) { + if (part.inlineData && part.inlineData.mimeType && part.inlineData.mimeType.startsWith('audio/pcm')) { + continue; + } + this.processPart(chunkJson, part); + this.traceService.setEventData(this.eventData); + } + } else if (chunkJson.errorMessage) { + this.processErrorMessage(chunkJson); + } + if (chunkJson.actions) { + this.processActionArtifact(chunkJson); + this.processActionStateDelta(chunkJson); + } + this.changeDetectorRef.detectChanges(); } selectApp(appName: string) { @@ -570,65 +618,46 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { role: 'user', parts: await this.getUserMessageParts(), }, - streaming: this.useSse, + streaming: this.useStreaming, stateDelta: this.updatedSessionState(), }; this.selectedFiles = []; this.streamingTextMessage = null; - this.agentService.runSse(req).subscribe({ - next: async (chunkJson: AdkEvent) => { - if (chunkJson.error) { - this.openSnackBar(chunkJson.error, 'OK'); - return; - } - if (chunkJson.content) { - let parts = this.combineTextParts(chunkJson.content.parts); - if (this.isEventA2aResponse(chunkJson)) { - parts = this.combineA2uiDataParts(parts); - } - - for (let part of parts) { - this.processPart(chunkJson, part); - this.traceService.setEventData(this.eventData); + if (!this.useLive) { + this.agentService.runSse(req).subscribe({ + next: (chunkJson: AdkEvent) => this.handleAdkEvent(chunkJson), + error: (err) => { + console.error('Send message error:', err); + this.openSnackBar(err, 'OK'); + }, + complete: () => { + if (this.updatedSessionState()) { + this.currentSessionState = this.updatedSessionState(); + this.updatedSessionState.set(null); } - } else if (chunkJson.errorMessage) { - this.processErrorMessage(chunkJson); - } - if (chunkJson.actions) { - this.processActionArtifact(chunkJson); - this.processActionStateDelta(chunkJson); - } - this.changeDetectorRef.detectChanges(); - }, - error: (err) => { - console.error('Send message error:', err); - this.openSnackBar(err, 'OK'); - }, - complete: () => { - if (this.updatedSessionState()) { - this.currentSessionState = this.updatedSessionState(); - this.updatedSessionState.set(null); - } - this.streamingTextMessage = null; - this.featureFlagService.isSessionReloadOnNewMessageEnabled() - .pipe(first()) - .subscribe((enabled) => { - if (enabled) { - this.sessionTab?.reloadSession(this.sessionId); - } - }); - this.eventService.getTrace(this.sessionId) - .pipe(first(), catchError((error) => { - return of([]); - })) - .subscribe((res) => { - this.traceData = res; - this.changeDetectorRef.detectChanges(); - }); - this.traceService.setMessages(this.messages()); - this.changeDetectorRef.detectChanges(); - }, - }); + this.streamingTextMessage = null; + this.featureFlagService.isSessionReloadOnNewMessageEnabled() + .pipe(first()) + .subscribe((enabled) => { + if (enabled) { + this.sessionTab?.reloadSession(this.sessionId); + } + }); + this.eventService.getTrace(this.sessionId) + .pipe(first(), catchError((error) => { + return of([]); + })) + .subscribe((res) => { + this.traceData = res; + this.changeDetectorRef.detectChanges(); + }); + this.traceService.setMessages(this.messages()); + this.changeDetectorRef.detectChanges(); + }, + }); + } else { + this.streamChatService.sendMessage(req); + } // Clear input this.userInput = ''; // Clear the query param for the initial user input once it is sent. @@ -679,7 +708,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { chunkJson.groundingMetadata.searchEntryPoint.renderedContent; } - if (!this.useSse) { + if (!this.useStreaming) { this.insertMessageBeforeLoadingMessage(this.streamingTextMessage); this.storeEvents(part, chunkJson); this.streamingTextMessage = null; @@ -714,7 +743,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { } } else if (!part.thought) { // Skip partial events for non-text parts to avoid duplicates - if (this.useSse && chunkJson.partial) { + if (this.useStreaming && chunkJson.partial) { return; } @@ -1059,7 +1088,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { private insertMessageBeforeLoadingMessage(message: any) { this.messages.update((messages) => { // If SSE streaming is enabled and this is a text message with eventId - if (this.useSse && message.text && message.eventId && + if (this.useStreaming && message.text && message.eventId && message.role === 'bot') { // Find existing streaming message with the same eventId const existingIndex = messages.findIndex( @@ -1392,24 +1421,12 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { } startAudioRecording() { - if (this.sessionHasUsedBidi.has(this.sessionId)) { - this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK'); - return; - } - this.isAudioRecording = true; this.streamChatService.startAudioChat({ appName: this.appName, userId: this.userId, sessionId: this.sessionId, }); - this.messages.update( - messages => - [...messages, - {role: 'user', text: 'Speaking...'}, - {role: 'bot', text: 'Speaking...'}, - ]); - this.sessionHasUsedBidi.add(this.sessionId); } stopAudioRecording() { @@ -1423,10 +1440,6 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { } startVideoRecording() { - if (this.sessionHasUsedBidi.has(this.sessionId)) { - this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK'); - return; - } const videoContainer = this.chatPanel()?.videoContainer; if (!videoContainer) { return; @@ -1438,9 +1451,6 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { sessionId: this.sessionId, videoContainer, }); - this.messages.update( - messages => [...messages, {role: 'user', text: 'Speaking...'}]); - this.sessionHasUsedBidi.add(this.sessionId); } stopVideoRecording() { @@ -1637,6 +1647,17 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { eventId: event.id }; + if (event.inputTranscription) { + event.content = { + role: 'user', + parts: [ + { + text: event.inputTranscription.text + } + ] + } + } + event.content?.parts?.forEach((part: any) => { this.processPartIntoMessage(part, event, userMessage); }); @@ -1655,6 +1676,17 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { eventId: event.id }; + if (event.outputTranscription) { + event.content = { + role: 'bot', + parts: [ + { + text: event.outputTranscription.text + } + ] + } + } + event.content?.parts?.forEach((part: any) => { this.processPartIntoMessage(part, event, botMessage); }); @@ -1905,8 +1937,16 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { this.selectedFiles.splice(index, 1); } - toggleSse() { - this.useSse = !this.useSse; + toggleStreaming() { + this.useStreaming = !this.useStreaming; + } + + toggleLive() { + this.useLive = !this.useLive; + if (this.useLive) { + this.enableStreamingIndicator.set(true); + this.useStreaming = true; + } } enterBuilderMode() { diff --git a/src/app/core/models/types.ts b/src/app/core/models/types.ts index 411fa074..003d05a0 100644 --- a/src/app/core/models/types.ts +++ b/src/app/core/models/types.ts @@ -76,6 +76,15 @@ export declare interface LlmResponse { errorMessage?: string; errorCode?: string; longRunningToolIds?: string[]; + outputTranscription?: { + text: string; + finished: boolean; + }; + inputTranscription?: { + text: string; + finished: boolean; + }; + partial?: boolean; } export declare interface EventActions { diff --git a/src/app/core/services/interfaces/stream-chat.ts b/src/app/core/services/interfaces/stream-chat.ts index 4237e185..84396f66 100644 --- a/src/app/core/services/interfaces/stream-chat.ts +++ b/src/app/core/services/interfaces/stream-chat.ts @@ -17,6 +17,8 @@ import {ElementRef, InjectionToken} from '@angular/core'; import {Observable} from 'rxjs'; +import {LlmResponse} from '../../models/types'; +import {AgentRunRequest} from "../../models/AgentRunRequest"; export const STREAM_CHAT_SERVICE = new InjectionToken('StreamChatService'); @@ -40,4 +42,6 @@ export declare abstract class StreamChatService { abstract stopVideoChat(videoContainer: ElementRef): void; abstract onStreamClose(): Observable; abstract closeStream(): void; + abstract getMessages(): Observable; + abstract sendMessage(req: AgentRunRequest): void; } diff --git a/src/app/core/services/stream-chat.service.ts b/src/app/core/services/stream-chat.service.ts index e01aaf08..4672b8fb 100644 --- a/src/app/core/services/stream-chat.service.ts +++ b/src/app/core/services/stream-chat.service.ts @@ -26,6 +26,8 @@ import {VIDEO_SERVICE} from './interfaces/video'; import {WEBSOCKET_SERVICE} from './interfaces/websocket'; import {VideoService} from './video.service'; import {WebSocketService} from './websocket.service'; +import {map,filter} from "rxjs/operators"; +import {AgentRunRequest} from "../models/AgentRunRequest"; /** * Service for supporting live streaming with audio/video. @@ -39,26 +41,43 @@ export class StreamChatService implements StreamChatServiceInterface { private readonly webSocketService = inject(WEBSOCKET_SERVICE); private audioIntervalId: number|undefined = undefined; private videoIntervalId: number|undefined = undefined; + private currentUrl: string|undefined = undefined; constructor() {} - async startAudioChat({ + connect({ appName, userId, sessionId, }: {appName: string; userId: string; sessionId: string;}) { const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'; - this.webSocketService.connect( - `${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${ - appName}&user_id=${userId}&session_id=${sessionId}`, - ); + const url = `${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${ + appName}&user_id=${userId}&session_id=${sessionId}`; + if (this.currentUrl !== url) { + this.webSocketService.connect(url); + this.currentUrl = url; + } + } + sendMessage(req: AgentRunRequest): void { + this.connect({appName: req.appName, userId: req.userId, sessionId: req.sessionId!}); + const request: LiveRequest = { + content: req.newMessage, + }; + this.webSocketService.sendMessage(request); + } + + async startAudioChat({ + appName, + userId, + sessionId, + }: {appName: string; userId: string; sessionId: string;}) { + this.connect({ appName, userId, sessionId }); await this.startAudioStreaming(); } stopAudioChat() { this.stopAudioStreaming(); - this.webSocketService.closeConnection(); } private async startAudioStreaming() { @@ -99,12 +118,7 @@ export class StreamChatService implements StreamChatServiceInterface { appName: string; userId: string; sessionId: string; videoContainer: ElementRef }) { - const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'; - this.webSocketService.connect( - `${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${ - appName}&user_id=${userId}&session_id=${sessionId}`, - ); - + this.connect({ appName, userId, sessionId }); await this.startAudioStreaming(); await this.startVideoStreaming(videoContainer); } @@ -112,7 +126,6 @@ export class StreamChatService implements StreamChatServiceInterface { stopVideoChat(videoContainer: ElementRef) { this.stopAudioStreaming(); this.stopVideoStreaming(videoContainer); - this.webSocketService.closeConnection(); } private async startVideoStreaming(videoContainer: ElementRef) { @@ -153,4 +166,10 @@ export class StreamChatService implements StreamChatServiceInterface { closeStream() { this.webSocketService.closeConnection(); } + getMessages() { + return this.webSocketService.getMessages().pipe( + filter(msg => !!msg), + map(msg => JSON.parse(msg)), + ) + } } diff --git a/src/app/core/services/websocket.service.ts b/src/app/core/services/websocket.service.ts index 13a8a879..733ed514 100644 --- a/src/app/core/services/websocket.service.ts +++ b/src/app/core/services/websocket.service.ts @@ -66,7 +66,9 @@ export class WebSocketService implements WebSocketServiceInterface { } sendMessage(data: LiveRequest) { - data.blob.data = this.arrayBufferToBase64(data.blob.data.buffer); + if (data.blob?.data) { + data.blob.data = this.arrayBufferToBase64(data.blob.data.buffer); + } if (!this.socket$ || this.socket$.closed) { console.error('WebSocket is not open.'); return; @@ -99,12 +101,14 @@ export class WebSocketService implements WebSocketServiceInterface { private handleIncomingAudio(message: any) { const msg = JSON.parse(message) as Event; if ( - msg['content'] && - msg['content']['parts'] && - msg['content']['parts'][0]['inlineData'] + msg.content && + msg.content.parts && + msg.content.parts[0] && + msg.content.parts[0].inlineData && + msg.content.parts[0].inlineData.mimeType?.startsWith('audio/pcm') ) { const pcmBytes = this.base64ToUint8Array( - msg['content']['parts'][0]['inlineData']['data'], + msg.content.parts[0].inlineData.data, ); this.audioBuffer.push(pcmBytes); }