Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions extensions/copilot/src/platform/endpoint/node/responsesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ interface LatestCompactionOutput {
readonly outputIndex: number;
}

type CompactionResponseOutputItem = OpenAI.Responses.ResponseOutputItem & OpenAIContextManagementResponse;

interface CompactionItemInChunk {
readonly item: OpenAIContextManagementResponse;
readonly outputIndex: number | undefined;
}

interface ResponseStreamEventWithOutputItem {
readonly item: unknown;
readonly output_index: number;
}

interface ResponseStreamEventWithResponseOutput {
readonly response: {
readonly output: OpenAI.Responses.ResponseOutputItem[];
};
}

function resolveWebSocketStatefulMarker(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions): string | undefined {
if (options.ignoreStatefulMarker || !options.useWebSocket || !options.conversationId) {
return undefined;
Expand Down Expand Up @@ -519,24 +537,40 @@ function responseFunctionOutputToRawContents(output: string | OpenAI.Responses.R
return coalesce(output.map(responseContentToRawContent));
}

function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): boolean {
return item.type.toString() === openAIContextManagementCompactionType;
function isCompactionItem(value: unknown): value is OpenAIContextManagementResponse {
return typeof value === 'object' && value !== null && 'type' in value && String(value.type) === openAIContextManagementCompactionType;
}

function hasOutputItem(chunk: OpenAI.Responses.ResponseStreamEvent): chunk is OpenAI.Responses.ResponseStreamEvent & ResponseStreamEventWithOutputItem {
return 'item' in chunk && 'output_index' in chunk && typeof chunk.output_index === 'number';
}

function hasResponseOutput(chunk: OpenAI.Responses.ResponseStreamEvent): chunk is OpenAI.Responses.ResponseStreamEvent & ResponseStreamEventWithResponseOutput {
return 'response' in chunk && Array.isArray(chunk.response.output);
}

function getOutputItemIndex(chunk: ResponseStreamEventWithOutputItem): number {
return chunk.output_index;
}

function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): item is CompactionResponseOutputItem {
return isCompactionItem(item);
}

function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): LatestCompactionOutput | undefined {
let latestCompactionOutput: LatestCompactionOutput | undefined;
for (let idx = output.length - 1; idx >= 0; idx--) {
const item = output[idx];
if (isCompactionOutputItem(item)) {
latestCompactionOutput = { item: item as unknown as OpenAIContextManagementResponse, outputIndex: idx };
latestCompactionOutput = { item, outputIndex: idx };
break;
}
}

if (preferredOutputIndex !== undefined) {
const preferredItem = output[preferredOutputIndex];
if (preferredItem && isCompactionOutputItem(preferredItem) && (!latestCompactionOutput || preferredOutputIndex >= latestCompactionOutput.outputIndex)) {
return { item: preferredItem as unknown as OpenAIContextManagementResponse, outputIndex: preferredOutputIndex };
return { item: preferredItem, outputIndex: preferredOutputIndex };
}
}

Expand Down Expand Up @@ -615,11 +649,61 @@ export class OpenAIResponsesProcessor {
@ILogService private readonly logService: ILogService,
) { }

private getCompactionItemsInChunk(chunk: OpenAI.Responses.ResponseStreamEvent): CompactionItemInChunk[] {
const compactionItems: CompactionItemInChunk[] = [];

if (hasOutputItem(chunk) && isCompactionItem(chunk.item)) {
const outputIndex = getOutputItemIndex(chunk);
compactionItems.push({ item: chunk.item, outputIndex });
}

if (hasResponseOutput(chunk)) {
for (let idx = 0; idx < chunk.response.output.length; idx++) {
const item = chunk.response.output[idx];
if (isCompactionItem(item)) {
compactionItems.push({ item, outputIndex: idx });
}
}
}

return compactionItems;
}

private captureCompactionItem(item: OpenAIContextManagementResponse, outputIndex: number | undefined, onProgress: (delta: IResponseDelta) => undefined): void {
if (outputIndex !== undefined && this.latestCompactionOutputIndex !== undefined && outputIndex < this.latestCompactionOutputIndex) {
return;
}

const previousCompactionItem = this.latestCompactionItem;
this.sawCompactionMessage = true;
this.latestCompactionOutputIndex = outputIndex ?? this.latestCompactionOutputIndex;
this.latestCompactionItem = item;

if (previousCompactionItem?.id === item.id && previousCompactionItem.encrypted_content === item.encrypted_content) {
return;
}

onProgress({
text: '',
contextManagement: {
type: openAIContextManagementCompactionType,
id: item.id,
encrypted_content: item.encrypted_content,
}
});
}

public push(chunk: OpenAI.Responses.ResponseStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {
const onProgress = (delta: IResponseDelta): undefined => {
this.textAccumulator += delta.text;
_onProgress(this.textAccumulator, 0, delta);
};
const compactionItems = this.getCompactionItemsInChunk(chunk);
if (chunk.type !== 'response.completed') {
for (const { item, outputIndex } of compactionItems) {
this.captureCompactionItem(item, outputIndex, onProgress);
}
}

switch (chunk.type) {
case 'error':
Expand Down Expand Up @@ -662,23 +746,6 @@ export class OpenAIResponsesProcessor {
return;
}
case 'response.output_item.done':
if (chunk.item.type.toString() === openAIContextManagementCompactionType) {
const compactionItem = chunk.item as unknown as OpenAIContextManagementResponse;
if (this.latestCompactionOutputIndex !== undefined && chunk.output_index < this.latestCompactionOutputIndex) {
return;
}
this.latestCompactionOutputIndex = chunk.output_index;
this.latestCompactionItem = compactionItem;
this.sawCompactionMessage = true;
return onProgress({
text: '',
contextManagement: {
type: openAIContextManagementCompactionType,
id: compactionItem.id,
encrypted_content: compactionItem.encrypted_content,
}
});
}
if (chunk.item.type === 'function_call') {
this.toolCallInfo.delete(chunk.output_index);
onProgress({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ describe('processResponseFromChatEndpoint telemetry', () => {

const olderCompaction = createCompactionResponse('cmp_old', 'enc_old');
const newerCompaction = createCompactionResponse('cmp_new', 'enc_new');
const compactionAddedEvent = {
type: 'response.output_item.added',
output_index: 0,
item: olderCompaction,
};
const compactionEvent = {
type: 'response.output_item.done',
output_index: 0,
Expand Down Expand Up @@ -577,7 +582,7 @@ describe('processResponseFromChatEndpoint telemetry', () => {
}
};

const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`);
const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionAddedEvent)}\n\ndata: ${JSON.stringify(compactionEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`);
const telemetryData = TelemetryData.createAndMarkAsIssued({ modelCallId: 'model-call-latest-compaction' }, {});

const stream = await processResponseFromChatEndpoint(
Expand Down Expand Up @@ -683,6 +688,85 @@ describe('processResponseFromChatEndpoint telemetry', () => {
services.dispose();
});

it('captures compaction returned before output_item.done for the next request', async () => {
const services = createPlatformServices();
const accessor = services.createTestingAccessor();
const instantiationService = accessor.get(IInstantiationService);
const logService = accessor.get(ILogService);
const telemetryService = new SpyingTelemetryService();
const streamedCompactions: OpenAIContextManagementResponse[] = [];

const earlyCompaction = createCompactionResponse('cmp_early', 'enc_early');
const compactionAddedEvent = {
type: 'response.output_item.added',
output_index: 0,
item: earlyCompaction,
};
const completedEvent = {
type: 'response.completed',
response: {
id: 'resp_early_compaction',
model: 'gpt-5-mini',
created_at: 123,
usage: {
input_tokens: 1200,
output_tokens: 9,
total_tokens: 1209,
input_tokens_details: { cached_tokens: 0 },
output_tokens_details: { reasoning_tokens: 0 },
},
output: [
{
type: 'message',
content: [{ type: 'output_text', text: 'reply' }],
},
],
}
};

const response = createFakeStreamResponse(`data: ${JSON.stringify(compactionAddedEvent)}\n\ndata: ${JSON.stringify(completedEvent)}\n\n`);
const telemetryData = TelemetryData.createAndMarkAsIssued({ modelCallId: 'model-call-early-compaction' }, {});

const stream = await processResponseFromChatEndpoint(
instantiationService,
telemetryService,
logService,
response,
1,
async (_text, _unused, delta) => {
if (delta.contextManagement && isOpenAIContextManagementResponse(delta.contextManagement)) {
streamedCompactions.push(delta.contextManagement);
}
return undefined;
},
telemetryData,
1000
);

for await (const _ of stream) {
// consume stream
}

expect(streamedCompactions.map(item => item.id)).toEqual(['cmp_early']);

const body = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions([
createCompactionAssistantMessage(streamedCompactions[streamedCompactions.length - 1]),
{
role: Raw.ChatRole.User,
content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'continue' }],
},
], false), testEndpoint.model, testEndpoint));

expect(body.input).toContainEqual({
type: openAIContextManagementCompactionType,
id: 'cmp_early',
encrypted_content: 'enc_early',
});

accessor.dispose();
services.dispose();
});

it('emits telemetry when the server returns a compaction item', async () => {
const services = createPlatformServices();
const accessor = services.createTestingAccessor();
Expand Down
Loading