diff --git a/extensions/copilot/src/platform/endpoint/node/responsesApi.ts b/extensions/copilot/src/platform/endpoint/node/responsesApi.ts index a71a44623ef2f..2ce10375fa62e 100644 --- a/extensions/copilot/src/platform/endpoint/node/responsesApi.ts +++ b/extensions/copilot/src/platform/endpoint/node/responsesApi.ts @@ -138,6 +138,24 @@ interface LatestCompactionOutput { readonly outputIndex: number; } +type CompactionResponseOutputItem = OpenAI.Responses.ResponseOutputItem & OpenAIContextManagementResponse; + +interface CompactionItemInChunk { + readonly item: OpenAIContextManagementResponse; + readonly outputIndex: number | undefined; +} + +interface ResponseStreamEventWithOutputItem { + readonly item: unknown; + readonly output_index: number; +} + +interface ResponseStreamEventWithResponseOutput { + readonly response: { + readonly output: OpenAI.Responses.ResponseOutputItem[]; + }; +} + function resolveWebSocketStatefulMarker(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions): string | undefined { if (options.ignoreStatefulMarker || !options.useWebSocket || !options.conversationId) { return undefined; @@ -519,8 +537,24 @@ function responseFunctionOutputToRawContents(output: string | OpenAI.Responses.R return coalesce(output.map(responseContentToRawContent)); } -function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): boolean { - return item.type.toString() === openAIContextManagementCompactionType; +function isCompactionItem(value: unknown): value is OpenAIContextManagementResponse { + return typeof value === 'object' && value !== null && 'type' in value && String(value.type) === openAIContextManagementCompactionType; +} + +function hasOutputItem(chunk: OpenAI.Responses.ResponseStreamEvent): chunk is OpenAI.Responses.ResponseStreamEvent & ResponseStreamEventWithOutputItem { + return 'item' in chunk && 'output_index' in chunk && typeof chunk.output_index === 'number'; +} + +function hasResponseOutput(chunk: OpenAI.Responses.ResponseStreamEvent): chunk is OpenAI.Responses.ResponseStreamEvent & ResponseStreamEventWithResponseOutput { + return 'response' in chunk && Array.isArray(chunk.response.output); +} + +function getOutputItemIndex(chunk: ResponseStreamEventWithOutputItem): number { + return chunk.output_index; +} + +function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): item is CompactionResponseOutputItem { + return isCompactionItem(item); } function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): LatestCompactionOutput | undefined { @@ -528,7 +562,7 @@ function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[] for (let idx = output.length - 1; idx >= 0; idx--) { const item = output[idx]; if (isCompactionOutputItem(item)) { - latestCompactionOutput = { item: item as unknown as OpenAIContextManagementResponse, outputIndex: idx }; + latestCompactionOutput = { item, outputIndex: idx }; break; } } @@ -536,7 +570,7 @@ function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[] if (preferredOutputIndex !== undefined) { const preferredItem = output[preferredOutputIndex]; if (preferredItem && isCompactionOutputItem(preferredItem) && (!latestCompactionOutput || preferredOutputIndex >= latestCompactionOutput.outputIndex)) { - return { item: preferredItem as unknown as OpenAIContextManagementResponse, outputIndex: preferredOutputIndex }; + return { item: preferredItem, outputIndex: preferredOutputIndex }; } } @@ -615,11 +649,61 @@ export class OpenAIResponsesProcessor { @ILogService private readonly logService: ILogService, ) { } + private getCompactionItemsInChunk(chunk: OpenAI.Responses.ResponseStreamEvent): CompactionItemInChunk[] { + const compactionItems: CompactionItemInChunk[] = []; + + if (hasOutputItem(chunk) && isCompactionItem(chunk.item)) { + const outputIndex = getOutputItemIndex(chunk); + compactionItems.push({ item: chunk.item, outputIndex }); + } + + if (hasResponseOutput(chunk)) { + for (let idx = 0; idx < chunk.response.output.length; idx++) { + const item = chunk.response.output[idx]; + if (isCompactionItem(item)) { + compactionItems.push({ item, outputIndex: idx }); + } + } + } + + return compactionItems; + } + + private captureCompactionItem(item: OpenAIContextManagementResponse, outputIndex: number | undefined, onProgress: (delta: IResponseDelta) => undefined): void { + if (outputIndex !== undefined && this.latestCompactionOutputIndex !== undefined && outputIndex < this.latestCompactionOutputIndex) { + return; + } + + const previousCompactionItem = this.latestCompactionItem; + this.sawCompactionMessage = true; + this.latestCompactionOutputIndex = outputIndex ?? this.latestCompactionOutputIndex; + this.latestCompactionItem = item; + + if (previousCompactionItem?.id === item.id && previousCompactionItem.encrypted_content === item.encrypted_content) { + return; + } + + onProgress({ + text: '', + contextManagement: { + type: openAIContextManagementCompactionType, + id: item.id, + encrypted_content: item.encrypted_content, + } + }); + } + public push(chunk: OpenAI.Responses.ResponseStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined { const onProgress = (delta: IResponseDelta): undefined => { this.textAccumulator += delta.text; _onProgress(this.textAccumulator, 0, delta); }; + const compactionItems = this.getCompactionItemsInChunk(chunk); + if (chunk.type !== 'response.completed') { + for (const { item, outputIndex } of compactionItems) { + this.captureCompactionItem(item, outputIndex, onProgress); + } + } switch (chunk.type) { case 'error': @@ -662,23 +746,6 @@ export class OpenAIResponsesProcessor { return; } case 'response.output_item.done': - if (chunk.item.type.toString() === openAIContextManagementCompactionType) { - const compactionItem = chunk.item as unknown as OpenAIContextManagementResponse; - if (this.latestCompactionOutputIndex !== undefined && chunk.output_index < this.latestCompactionOutputIndex) { - return; - } - this.latestCompactionOutputIndex = chunk.output_index; - this.latestCompactionItem = compactionItem; - this.sawCompactionMessage = true; - return onProgress({ - text: '', - contextManagement: { - type: openAIContextManagementCompactionType, - id: compactionItem.id, - encrypted_content: compactionItem.encrypted_content, - } - }); - } if (chunk.item.type === 'function_call') { this.toolCallInfo.delete(chunk.output_index); onProgress({ diff --git a/extensions/copilot/src/platform/endpoint/node/test/responsesApi.spec.ts b/extensions/copilot/src/platform/endpoint/node/test/responsesApi.spec.ts index e9a3db8a4953c..c1c0064d12a92 100644 --- a/extensions/copilot/src/platform/endpoint/node/test/responsesApi.spec.ts +++ b/extensions/copilot/src/platform/endpoint/node/test/responsesApi.spec.ts @@ -548,6 +548,11 @@ describe('processResponseFromChatEndpoint telemetry', () => { const olderCompaction = createCompactionResponse('cmp_old', 'enc_old'); const newerCompaction = createCompactionResponse('cmp_new', 'enc_new'); + const compactionAddedEvent = { + type: 'response.output_item.added', + output_index: 0, + item: olderCompaction, + }; const compactionEvent = { type: 'response.output_item.done', output_index: 0, @@ -577,7 +582,7 @@ describe('processResponseFromChatEndpoint telemetry', () => { } }; - const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`); + const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionAddedEvent)}\n\ndata: ${JSON.stringify(compactionEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`); const telemetryData = TelemetryData.createAndMarkAsIssued({ modelCallId: 'model-call-latest-compaction' }, {}); const stream = await processResponseFromChatEndpoint( @@ -683,6 +688,85 @@ describe('processResponseFromChatEndpoint telemetry', () => { services.dispose(); }); + it('captures compaction returned before output_item.done for the next request', async () => { + const services = createPlatformServices(); + const accessor = services.createTestingAccessor(); + const instantiationService = accessor.get(IInstantiationService); + const logService = accessor.get(ILogService); + const telemetryService = new SpyingTelemetryService(); + const streamedCompactions: OpenAIContextManagementResponse[] = []; + + const earlyCompaction = createCompactionResponse('cmp_early', 'enc_early'); + const compactionAddedEvent = { + type: 'response.output_item.added', + output_index: 0, + item: earlyCompaction, + }; + const completedEvent = { + type: 'response.completed', + response: { + id: 'resp_early_compaction', + model: 'gpt-5-mini', + created_at: 123, + usage: { + input_tokens: 1200, + output_tokens: 9, + total_tokens: 1209, + input_tokens_details: { cached_tokens: 0 }, + output_tokens_details: { reasoning_tokens: 0 }, + }, + output: [ + { + type: 'message', + content: [{ type: 'output_text', text: 'reply' }], + }, + ], + } + }; + + const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionAddedEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`); + const telemetryData = TelemetryData.createAndMarkAsIssued({ modelCallId: 'model-call-early-compaction' }, {}); + + const stream = await processResponseFromChatEndpoint( + instantiationService, + telemetryService, + logService, + response, + 1, + async (_text, _unused, delta) => { + if (delta.contextManagement && isOpenAIContextManagementResponse(delta.contextManagement)) { + streamedCompactions.push(delta.contextManagement); + } + return undefined; + }, + telemetryData, + 1000 + ); + + for await (const _ of stream) { + // consume stream + } + + expect(streamedCompactions.map(item => item.id)).toEqual(['cmp_early']); + + const body = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions([ + createCompactionAssistantMessage(streamedCompactions[streamedCompactions.length - 1]), + { + role: Raw.ChatRole.User, + content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'continue' }], + }, + ], false), testEndpoint.model, testEndpoint)); + + expect(body.input).toContainEqual({ + type: openAIContextManagementCompactionType, + id: 'cmp_early', + encrypted_content: 'enc_early', + }); + + accessor.dispose(); + services.dispose(); + }); + it('emits telemetry when the server returns a compaction item', async () => { const services = createPlatformServices(); const accessor = services.createTestingAccessor();