From bb697b3b5f62c1a20b025687f388ecc290015ad0 Mon Sep 17 00:00:00 2001 From: Audrey Tang Date: Tue, 12 May 2026 06:40:53 -0400 Subject: [PATCH] feat(server): add /v1/responses (OpenAI Responses API) for Codex CLI Implements the Responses API endpoint that Codex CLI (and other modern OpenAI tooling) speaks instead of /v1/chat/completions. The wire format is documented in OpenAI's Responses API; this implementation has been iterated against the Codex CLI binary's SSE parser shape until no remaining schema gaps were found. Request parsing (parse_responses_request, parse_responses_input): - Accepts the typed input array (message, function_call, function_call_output, reasoning, custom_tool_call(_output), local_shell_call(_output), web_search_call(_output), tool_search_call(_output), image_generation_call(_output), compaction, context_compaction). - Maps hosted-tool history to function_call/function_call_output so prior actions survive across turns; rejects unknown item types and non-completed status with 400 to avoid silent context loss. - Strict content-array parsing: only string|null|array of recognized text blocks (input_text/output_text/text/summary_text/ reasoning_text); rejects non-text modalities (input_image/file/ audio) instead of accepting an empty prompt. - Merges adjacent function_call items into the preceding assistant message so text + tool-call turns render as a single assistant block. - Honors reasoning.effort (incl. "minimal"/"none") and gates reasoning summary surface on reasoning.summary opt-in. - Rejects previous_response_id, conversation, and forced tool_choice explicitly (constrained decoding / persisted state not supported). Output (responses_sse_*, responses_final_response): - Emits the full streaming lifecycle: response.created, output_item.added/.done, reasoning_summary_part.added/.done, reasoning_summary_text.delta/.done, content_part.added/.done, output_text.delta/.done, function_call_arguments.delta/.done, response.completed. - Branches the terminal event by finish reason: response.failed for errors and response.incomplete with reason "max_tokens" for length. - Every event carries sequence_number; every output_text part carries annotations:[]; function_call output_item.added ships with an empty arguments string (full args arrive via function_call_arguments.done and output_item.done), and item ids are stable across added/done. - Tracks whether was actually observed so a truncated stream marks the reasoning item incomplete instead of "completed". - Recovers gracefully when the DSML tool parse fails after the model was suppressed at the tool marker: the suppressed tail is flushed as additional output_text deltas so the streamed message matches output_item.done. Tested by 25 rounds of /codex:adversarial-review against the same client this is meant to feed. Co-Authored-By: Claude Opus 4.7 (1M context) --- ds4_server.c | 2027 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 1835 insertions(+), 192 deletions(-) diff --git a/ds4_server.c b/ds4_server.c index bc8abbbd..46980d17 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -475,6 +475,7 @@ typedef enum { typedef enum { API_OPENAI, API_ANTHROPIC, + API_RESPONSES, } api_style; static void random_tool_id(char *dst, size_t dstlen, api_style api) { @@ -580,6 +581,10 @@ typedef struct { ds4_think_mode think_mode; bool has_tools; bool prompt_preserves_reasoning; + /* For /v1/responses: emit reasoning_summary_* events / fields only when the + * client opted in via reasoning.summary. Other APIs leave this false; the + * field is ignored on those code paths. */ + bool reasoning_summary_emit; tool_replay_stats tool_replay; } request; @@ -703,7 +708,7 @@ static void request_free(request *r) { } static ds4_think_mode think_mode_from_enabled(bool enabled, ds4_think_mode effort) { - if (!enabled) return DS4_THINK_NONE; + if (!enabled || effort == DS4_THINK_NONE) return DS4_THINK_NONE; return effort == DS4_THINK_MAX ? DS4_THINK_MAX : DS4_THINK_HIGH; } @@ -714,11 +719,19 @@ static bool parse_reasoning_effort_name(const char *s, ds4_think_mode *out) { return true; } if (!strcmp(s, "xhigh") || !strcmp(s, "high") || - !strcmp(s, "medium") || !strcmp(s, "low")) + !strcmp(s, "medium") || !strcmp(s, "low") || + !strcmp(s, "minimal")) { + /* DS4 only exposes HIGH and MAX above zero, so "minimal" collapses to + * the smallest non-zero level (HIGH). Callers that need *no* reasoning + * must use "none" instead. */ *out = DS4_THINK_HIGH; return true; } + if (!strcmp(s, "none")) { + *out = DS4_THINK_NONE; + return true; + } return false; } @@ -2351,44 +2364,613 @@ static bool parse_anthropic_request(ds4_engine *e, server *s, const char *body, return false; } -static bool parse_prompt(const char **p, char **out) { +/* Responses API: convert a content-array item (input_text/output_text/text) into a + * concatenated string. Strict shape check: bare string, null, or an array of + * recognized text blocks. Numbers / objects / arrays-of-primitives at the top + * level all reject so the client sees a 400 instead of an answer built on + * silently dropped context. */ +static bool parse_responses_content_array(const char **p, char **out) { json_ws(p); if (**p == '"') return json_string(p, out); - if (**p != '[') { - if (!json_skip_value(p)) return false; + if (json_lit(p, "null")) { *out = xstrdup(""); return true; } + if (**p != '[') { + return false; + } (*p)++; + buf b = {0}; json_ws(p); - if (**p == '"') { - if (!json_string(p, out)) return false; - } else { - *out = xstrdup(""); - if (**p && **p != ']' && !json_skip_value(p)) return false; + while (**p && **p != ']') { + if (**p == '"') { + char *s = NULL; + if (!json_string(p, &s)) goto fail; + buf_puts(&b, s); + free(s); + } else if (**p == '{') { + (*p)++; + char *type = NULL; + char *text = NULL; + json_ws(p); + while (**p && **p != '}') { + char *key = NULL; + if (!json_string(p, &key)) { + free(type); + free(text); + goto fail; + } + json_ws(p); + if (**p != ':') { + free(key); + free(type); + free(text); + goto fail; + } + (*p)++; + if (!strcmp(key, "type")) { + free(type); + if (!json_string(p, &type)) { + free(key); + free(text); + goto fail; + } + } else if (!strcmp(key, "text")) { + free(text); + /* The text field of a typed content block is a plain JSON + * string. Accept null as the empty string for parity with + * upstream serializers that emit null for empty blocks. */ + json_ws(p); + if (json_lit(p, "null")) { + text = xstrdup(""); + } else if (!json_string(p, &text)) { + free(key); + free(type); + goto fail; + } + } else if (!json_skip_value(p)) { + free(key); + free(type); + free(text); + goto fail; + } + free(key); + json_ws(p); + if (**p == ',') (*p)++; + json_ws(p); + } + if (**p != '}') { + free(type); + free(text); + goto fail; + } + (*p)++; + /* Fail closed: a content object must carry a known text-like type + * AND a text field. Anything else — missing type, missing text, + * image/file/audio types, future schema-drift — is rejected so the + * client gets a 400 instead of an answer built on context the + * server discarded silently. */ + bool is_text_block = type && ( + !strcmp(type, "input_text") || + !strcmp(type, "output_text") || + !strcmp(type, "text") || + !strcmp(type, "summary_text") || + !strcmp(type, "reasoning_text")); + if (!is_text_block || !text) { + free(type); + free(text); + goto fail; + } + buf_puts(&b, text); + free(type); + free(text); + } else { + /* Reject primitives, arrays-of-arrays, nulls: a content array + * element must be either a string or a typed text object. */ + goto fail; + } + json_ws(p); + if (**p == ',') (*p)++; + json_ws(p); } + if (**p != ']') goto fail; + (*p)++; + *out = buf_take(&b); + return true; +fail: + buf_free(&b); + return false; +} + +/* Codex' /v1/responses input items have a `type` discriminator (message, + * function_call, function_call_output, reasoning, custom_tool_call, + * custom_tool_call_output, ...). We collapse them into chat_msgs the same way + * the chat completion / Anthropic parsers do, so the rest of the engine sees a + * single conversation history shape. Reasoning items are merged into the next + * assistant message so render_chat_prompt_text can wrap them in . */ +static bool parse_responses_input(const char **p, chat_msgs *msgs) { + json_ws(p); + if (**p != '[') return false; + (*p)++; + + buf pending_reasoning = {0}; + + json_ws(p); while (**p && **p != ']') { + if (**p != '{') goto fail; + (*p)++; + char *type = NULL; + char *role = NULL; + char *content = NULL; + char *name = NULL; + char *call_id = NULL; + char *arguments = NULL; + char *output = NULL; + char *input_str = NULL; + char *summary = NULL; + char *reasoning_content = NULL; + char *action = NULL; + char *result = NULL; + char *status_str = NULL; json_ws(p); - if (**p == ',') { + while (**p && **p != '}') { + char *key = NULL; + if (!json_string(p, &key)) goto item_fail; + json_ws(p); + if (**p != ':') { + free(key); + goto item_fail; + } (*p)++; - if (!json_skip_value(p)) return false; - } else { - break; + if (!strcmp(key, "type")) { + free(type); + if (!json_string(p, &type)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "role")) { + free(role); + if (!json_string(p, &role)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "content")) { + free(content); + if (!parse_responses_content_array(p, &content)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "name")) { + free(name); + if (!json_string(p, &name)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "call_id")) { + free(call_id); + if (!json_string(p, &call_id)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "arguments")) { + free(arguments); + json_ws(p); + if (**p == '"') { + if (!json_string(p, &arguments)) { + free(key); + goto item_fail; + } + } else if (!json_raw_value(p, &arguments)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "output")) { + free(output); + json_ws(p); + if (**p == '[') { + if (!parse_responses_content_array(p, &output)) { + free(key); + goto item_fail; + } + } else if (**p == '"') { + if (!json_string(p, &output)) { + free(key); + goto item_fail; + } + } else if (!json_raw_value(p, &output)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "input")) { + free(input_str); + json_ws(p); + if (**p == '"') { + if (!json_string(p, &input_str)) { + free(key); + goto item_fail; + } + } else if (!json_raw_value(p, &input_str)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "summary")) { + free(summary); + if (!parse_responses_content_array(p, &summary)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "action")) { + free(action); + if (!json_raw_value(p, &action)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "result")) { + free(result); + json_ws(p); + if (**p == '"') { + if (!json_string(p, &result)) { + free(key); + goto item_fail; + } + } else if (!json_raw_value(p, &result)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "status")) { + free(status_str); + if (!json_string(p, &status_str)) { + free(key); + goto item_fail; + } + } else if (!strcmp(key, "tools")) { + /* tool_search_output items carry their discovered tool list + * here instead of in `output`. We keep the raw JSON so the + * downstream renderer can show the model what was found. */ + free(result); + if (!json_raw_value(p, &result)) { + free(key); + goto item_fail; + } + } else if (!json_skip_value(p)) { + free(key); + goto item_fail; + } + free(key); + json_ws(p); + if (**p == ',') (*p)++; + json_ws(p); + continue; +item_fail: + free(type); + free(role); + free(content); + free(name); + free(call_id); + free(arguments); + free(output); + free(input_str); + free(summary); + free(reasoning_content); + free(action); + free(result); + free(status_str); + buf_free(&pending_reasoning); + return false; + } + if (**p != '}') { + free(type); + free(role); + free(content); + free(name); + free(call_id); + free(arguments); + free(output); + free(input_str); + free(summary); + free(reasoning_content); + free(action); + free(result); + free(status_str); + goto fail; + } + (*p)++; + + const char *t = type ? type : "message"; + /* Replayed items must be in a terminal "completed" state. in_progress, + * incomplete, and failed all represent partial model state the client + * never confirmed — feeding them back as history would let DS4 continue + * from a tool action that never finished. Reject explicitly. */ + if (status_str && status_str[0] && + strcmp(status_str, "completed") != 0) + { + free(type); + free(role); + free(content); + free(name); + free(call_id); + free(arguments); + free(output); + free(input_str); + free(summary); + free(reasoning_content); + free(action); + free(result); + free(status_str); + buf_free(&pending_reasoning); + return false; + } + /* Three classes of items: + * 1. consumes_reasoning: assistant message / function_call / hosted-tool + * call. Attaches pending reasoning to its own assistant message. + * 2. is_bookkeeping: compaction / context_compaction etc. Semantically + * transparent — passes through without touching pending_reasoning. + * 3. everything else (user message, tool output): forces pending + * reasoning to flush in-position as an empty assistant message so it + * stays before this item in the rendered history. */ + bool consumes_reasoning = + (!strcmp(t, "message") && role && !strcmp(role, "assistant")) || + !strcmp(t, "function_call") || !strcmp(t, "custom_tool_call") || + !strcmp(t, "local_shell_call") || !strcmp(t, "web_search_call") || + !strcmp(t, "tool_search_call") || !strcmp(t, "image_generation_call"); + bool is_bookkeeping = + !strcmp(t, "compaction") || !strcmp(t, "context_compaction"); + if (!consumes_reasoning && !is_bookkeeping && pending_reasoning.len) { + chat_msg flush_msg = {0}; + flush_msg.role = xstrdup("assistant"); + flush_msg.content = xstrdup(""); + flush_msg.reasoning = buf_take(&pending_reasoning); + chat_msgs_push(msgs, flush_msg); + } + if (!strcmp(t, "message")) { + chat_msg msg = {0}; + msg.role = xstrdup(role ? role : "user"); + msg.content = content ? content : xstrdup(""); + content = NULL; + if (!strcmp(msg.role, "assistant") && pending_reasoning.len) { + msg.reasoning = buf_take(&pending_reasoning); + } + chat_msgs_push(msgs, msg); + } else if (!strcmp(t, "function_call") || !strcmp(t, "custom_tool_call")) { + tool_call tc = {0}; + tc.id = xstrdup(call_id ? call_id : ""); + tc.name = xstrdup(name ? name : ""); + /* function_call uses `arguments` (JSON string); custom_tool_call uses + * `input` (free text). Treat both as the same on-wire argument blob — + * append_dsml_arguments_from_json will fall back to a single text param + * if the value isn't a JSON object. */ + const char *args_src = arguments ? arguments : + input_str ? input_str : "{}"; + tc.arguments = xstrdup(args_src); + /* A Responses turn that has both message text and tool calls splits + * them across separate output items; the chat template renders the + * second assistant record without an `<|Assistant|>` prefix, leaving + * the tool call bare. Merge into the previous assistant message + * when nothing user-like / tool-output-like came between them. */ + chat_msg *last = msgs->len ? &msgs->v[msgs->len - 1] : NULL; + if (last && !strcmp(last->role, "assistant")) { + if (pending_reasoning.len && (!last->reasoning || !last->reasoning[0])) { + free(last->reasoning); + last->reasoning = buf_take(&pending_reasoning); + } + tool_calls_push(&last->calls, tc); + } else { + chat_msg msg = {0}; + msg.role = xstrdup("assistant"); + msg.content = xstrdup(""); + if (pending_reasoning.len) msg.reasoning = buf_take(&pending_reasoning); + tool_calls_push(&msg.calls, tc); + chat_msgs_push(msgs, msg); + } + } else if (!strcmp(t, "function_call_output") || !strcmp(t, "custom_tool_call_output")) { + chat_msg msg = {0}; + msg.role = xstrdup("tool"); + msg.content = output ? output : xstrdup(""); + output = NULL; + if (call_id) { + msg.tool_call_id = call_id; + call_id = NULL; + } + chat_msgs_push(msgs, msg); + } else if (!strcmp(t, "reasoning")) { + /* Stash so it merges into the next assistant message. summary is the + * short-form list, content is the verbose chain. Either can be empty. */ + if (summary && summary[0]) { + if (pending_reasoning.len) buf_putc(&pending_reasoning, '\n'); + buf_puts(&pending_reasoning, summary); + } + if (content && content[0]) { + if (pending_reasoning.len) buf_putc(&pending_reasoning, '\n'); + buf_puts(&pending_reasoning, content); + } + } else if (!strcmp(t, "local_shell_call") || !strcmp(t, "web_search_call") || + !strcmp(t, "tool_search_call") || !strcmp(t, "image_generation_call")) + { + /* Hosted-tool history isn't natively supported (DS4 doesn't register + * these tools), but a Codex client may still replay them when the + * model used them in a prior turn. Surface them as function_call + * shaped history so the next prompt retains the action that ran. */ + tool_call tc = {0}; + tc.id = xstrdup(call_id ? call_id : ""); + tc.name = xstrdup(t); + const char *args_src = action ? action : + arguments ? arguments : + input_str ? input_str : "{}"; + tc.arguments = xstrdup(args_src); + chat_msg *last = msgs->len ? &msgs->v[msgs->len - 1] : NULL; + if (last && !strcmp(last->role, "assistant")) { + if (pending_reasoning.len && (!last->reasoning || !last->reasoning[0])) { + free(last->reasoning); + last->reasoning = buf_take(&pending_reasoning); + } + tool_calls_push(&last->calls, tc); + } else { + chat_msg msg = {0}; + msg.role = xstrdup("assistant"); + msg.content = xstrdup(""); + if (pending_reasoning.len) msg.reasoning = buf_take(&pending_reasoning); + tool_calls_push(&msg.calls, tc); + chat_msgs_push(msgs, msg); + } + } else if (!strcmp(t, "local_shell_call_output") || + !strcmp(t, "web_search_call_output") || + !strcmp(t, "tool_search_output") || + !strcmp(t, "tool_search_call_output") || + !strcmp(t, "image_generation_call_output")) + { + chat_msg msg = {0}; + msg.role = xstrdup("tool"); + const char *body = output ? output : result ? result : ""; + msg.content = xstrdup(body); + if (call_id) { + msg.tool_call_id = call_id; + call_id = NULL; + } + chat_msgs_push(msgs, msg); + } else if (!is_bookkeeping) { + /* Anything we don't have an explicit branch for would silently + * drop replay context. Fail the parse instead so the client sees + * the limitation rather than ending up with stale generation + * built on an incomplete history. Only compaction/context_compaction + * (true Codex bookkeeping) are allowed to pass through silently. */ + free(type); + free(role); + free(content); + free(name); + free(call_id); + free(arguments); + free(output); + free(input_str); + free(summary); + free(reasoning_content); + free(action); + free(result); + free(status_str); + buf_free(&pending_reasoning); + return false; } + + free(type); + free(role); + free(content); + free(name); + free(call_id); + free(arguments); + free(output); + free(input_str); + free(summary); + free(reasoning_content); + free(action); + free(result); + free(status_str); + json_ws(p); + if (**p == ',') (*p)++; + json_ws(p); } - if (**p != ']') return false; + if (**p != ']') goto fail; (*p)++; + /* Trailing reasoning with no following message/tool item: attach it to an + * empty assistant message so the next turn still renders a ... + * block. Dropping it loses model state when a previous response ended with + * a reasoning-only incomplete turn and the client replays the history. */ + if (pending_reasoning.len) { + chat_msg msg = {0}; + msg.role = xstrdup("assistant"); + msg.content = xstrdup(""); + msg.reasoning = buf_take(&pending_reasoning); + chat_msgs_push(msgs, msg); + } + buf_free(&pending_reasoning); return true; +fail: + buf_free(&pending_reasoning); + return false; } -static bool parse_completion_request(ds4_engine *e, const char *body, int def_tokens, - int ctx_size, request *r, char *err, size_t errlen) { - request_init(r, REQ_COMPLETION, def_tokens); +/* Responses API has `reasoning: {"effort": "...", "summary": "..."}`. effort + * controls thinking depth; summary mode (auto/concise/detailed) controls + * whether the wire emits summary deltas at all — per the spec, no reasoning + * summary is surfaced unless the client opts in. */ +static bool parse_responses_reasoning(const char **p, ds4_think_mode *effort, + bool *summary_opted_in, + bool *effort_seen) { + json_ws(p); + if (json_lit(p, "null")) return true; + if (**p != '{') return json_skip_value(p); + (*p)++; + json_ws(p); + while (**p && **p != '}') { + char *key = NULL; + if (!json_string(p, &key)) return false; + json_ws(p); + if (**p != ':') { + free(key); + return false; + } + (*p)++; + if (!strcmp(key, "effort")) { + json_ws(p); + /* A `null` effort doesn't change thinking_enabled — it's the same + * as omitting the field. Only treat the field as a control if it + * carried an actual value. */ + if (json_lit(p, "null")) { + /* nothing */ + } else { + if (!parse_reasoning_effort_value(p, effort)) { + free(key); + return false; + } + if (effort_seen) *effort_seen = true; + } + } else if (!strcmp(key, "summary")) { + json_ws(p); + if (json_lit(p, "null")) { + /* explicit null disables summary */ + } else if (**p == '"') { + char *mode = NULL; + if (!json_string(p, &mode)) { + free(key); + return false; + } + if (summary_opted_in && + (!strcmp(mode, "auto") || + !strcmp(mode, "concise") || + !strcmp(mode, "detailed"))) + { + *summary_opted_in = true; + } + free(mode); + } else if (!json_skip_value(p)) { + free(key); + return false; + } + } else if (!json_skip_value(p)) { + free(key); + return false; + } + free(key); + json_ws(p); + if (**p == ',') (*p)++; + json_ws(p); + } + if (**p != '}') return false; + (*p)++; + return true; +} + +static bool parse_responses_request(ds4_engine *e, server *s, const char *body, int def_tokens, + int ctx_size, request *r, char *err, size_t errlen) { + request_init(r, REQ_CHAT, def_tokens); + r->api = API_RESPONSES; const char *p = body; - char *prompt = NULL; + bool got_input = false; + bool tool_choice_none = false; bool got_thinking = false; bool thinking_enabled = true; ds4_think_mode reasoning_effort = DS4_THINK_HIGH; + chat_msgs msgs = {0}; + char *instructions = NULL; + char *tool_schemas = NULL; json_ws(&p); if (*p != '{') goto bad; @@ -2403,24 +2985,276 @@ static bool parse_completion_request(ds4_engine *e, const char *body, int def_to goto bad; } p++; - if (!strcmp(key, "prompt")) { - free(prompt); - if (!parse_prompt(&p, &prompt)) { + if (!strcmp(key, "input")) { + chat_msgs_free(&msgs); + json_ws(&p); + /* Codex CLI always sends `input` as an array; tolerate bare strings + * for parity with other Responses-API callers. */ + if (*p == '"') { + char *plain = NULL; + if (!json_string(&p, &plain)) { + free(key); + goto bad; + } + chat_msg msg = {0}; + msg.role = xstrdup("user"); + msg.content = plain; + chat_msgs_push(&msgs, msg); + } else if (!parse_responses_input(&p, &msgs)) { free(key); goto bad; } - } else if (!strcmp(key, "model")) { - free(r->model); - if (!json_string(&p, &r->model)) { + got_input = true; + } else if (!strcmp(key, "instructions")) { + free(instructions); + instructions = NULL; + json_ws(&p); + if (json_lit(&p, "null")) { + instructions = xstrdup(""); + } else if (!json_string(&p, &instructions)) { free(key); goto bad; } - } else if (!strcmp(key, "max_tokens")) { - if (!json_int(&p, &r->max_tokens)) { + } else if (!strcmp(key, "tools")) { + free(tool_schemas); + tool_schemas = NULL; + if (!parse_tools_value(&p, &tool_schemas, &r->tool_orders)) { free(key); goto bad; } - } else if (!strcmp(key, "temperature")) { + } else if (!strcmp(key, "tool_choice")) { + json_ws(&p); + if (*p == '"') { + char *choice = NULL; + if (!json_string(&p, &choice)) { + free(key); + goto bad; + } + /* DS4 honours "none" (disable tools) and "auto" (model decides). + * "required" and explicit function targets need constrained + * decoding we don't implement — reject so clients see the + * limitation instead of silently downgrading to auto. */ + if (!strcmp(choice, "none")) { + tool_choice_none = true; + } else if (strcmp(choice, "auto") != 0) { + snprintf(err, errlen, "tool_choice=%s not supported", choice); + free(choice); + free(key); + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } + free(choice); + } else if (*p == '{') { + snprintf(err, errlen, "forced tool_choice not supported"); + free(key); + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } else if (!json_skip_value(&p)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "model")) { + free(r->model); + if (!json_string(&p, &r->model)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "max_output_tokens") || !strcmp(key, "max_tokens")) { + if (!json_int(&p, &r->max_tokens)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "temperature")) { + double v = 0.0; + if (!json_number(&p, &v)) { + free(key); + goto bad; + } + r->temperature = (float)v; + } else if (!strcmp(key, "top_p")) { + double v = 0.0; + if (!json_number(&p, &v)) { + free(key); + goto bad; + } + r->top_p = (float)v; + } else if (!strcmp(key, "stream")) { + if (!json_bool(&p, &r->stream)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "reasoning")) { + bool effort_seen = false; + if (!parse_responses_reasoning(&p, &reasoning_effort, + &r->reasoning_summary_emit, + &effort_seen)) { + free(key); + goto bad; + } + /* Only an explicit effort value counts as the client opting into + * thinking control. summary alone, or `reasoning: null`, leaves the + * default behaviour (and the model_alias_* fallbacks below) intact. */ + if (effort_seen) { + got_thinking = true; + /* Responses-API effort of "minimal" / "none" maps to disabled + * thinking. Other effort values choose between HIGH and MAX. */ + if (reasoning_effort == DS4_THINK_NONE) thinking_enabled = false; + } + } else if (!strcmp(key, "previous_response_id") || + !strcmp(key, "conversation")) + { + /* DS4 doesn't persist conversation state across requests — the + * client is expected to replay the full input every turn. A + * non-null previous_response_id / conversation reference would + * silently answer with truncated history; reject explicitly. */ + json_ws(&p); + if (!json_lit(&p, "null")) { + snprintf(err, errlen, + "%s is not supported; replay full input instead", + key); + free(key); + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } + } else if (!json_skip_value(&p)) { + free(key); + goto bad; + } + free(key); + json_ws(&p); + if (*p == ',') p++; + json_ws(&p); + } + if (*p != '}') goto bad; + if (!got_input) { + snprintf(err, errlen, "missing input"); + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } + /* instructions in the Responses API replaces any system message — for Codex + * it carries the full agent system prompt. Prepend it so render produces a + * standard system+chat layout. */ + if (instructions && instructions[0]) { + chat_msg msg = {0}; + msg.role = xstrdup("system"); + msg.content = instructions; + instructions = NULL; + /* Insert at the head so it precedes the conversation. */ + chat_msgs_push(&msgs, msg); + if (msgs.len > 1) { + chat_msg tmp = msgs.v[msgs.len - 1]; + for (int i = msgs.len - 1; i > 0; i--) msgs.v[i] = msgs.v[i - 1]; + msgs.v[0] = tmp; + } + } + r->has_tools = tool_schemas && tool_schemas[0] && !tool_choice_none; + if (!got_thinking && model_alias_disables_thinking(r->model)) thinking_enabled = false; + if (!got_thinking && model_alias_enables_thinking(r->model)) thinking_enabled = true; + r->think_mode = ds4_think_mode_for_context( + think_mode_from_enabled(thinking_enabled, reasoning_effort), ctx_size); + kv_cache_restore_tool_memory_for_messages(s, &msgs); + tool_memory_attach_to_messages(s, &msgs, &r->tool_replay); + const char *active_tool_schemas = r->has_tools ? tool_schemas : NULL; + r->prompt_preserves_reasoning = + chat_history_uses_tool_context(&msgs, active_tool_schemas); + r->prompt_text = render_chat_prompt_text(&msgs, active_tool_schemas, + &r->tool_orders, r->think_mode); + ds4_tokenize_rendered_chat(e, r->prompt_text, &r->prompt); + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + return true; +bad: + chat_msgs_free(&msgs); + free(instructions); + free(tool_schemas); + snprintf(err, errlen, "invalid JSON request"); + request_free(r); + return false; +} + +static bool parse_prompt(const char **p, char **out) { + json_ws(p); + if (**p == '"') return json_string(p, out); + if (**p != '[') { + if (!json_skip_value(p)) return false; + *out = xstrdup(""); + return true; + } + (*p)++; + json_ws(p); + if (**p == '"') { + if (!json_string(p, out)) return false; + } else { + *out = xstrdup(""); + if (**p && **p != ']' && !json_skip_value(p)) return false; + } + while (**p && **p != ']') { + json_ws(p); + if (**p == ',') { + (*p)++; + if (!json_skip_value(p)) return false; + } else { + break; + } + } + if (**p != ']') return false; + (*p)++; + return true; +} + +static bool parse_completion_request(ds4_engine *e, const char *body, int def_tokens, + int ctx_size, request *r, char *err, size_t errlen) { + request_init(r, REQ_COMPLETION, def_tokens); + const char *p = body; + char *prompt = NULL; + bool got_thinking = false; + bool thinking_enabled = true; + ds4_think_mode reasoning_effort = DS4_THINK_HIGH; + + json_ws(&p); + if (*p != '{') goto bad; + p++; + json_ws(&p); + while (*p && *p != '}') { + char *key = NULL; + if (!json_string(&p, &key)) goto bad; + json_ws(&p); + if (*p != ':') { + free(key); + goto bad; + } + p++; + if (!strcmp(key, "prompt")) { + free(prompt); + if (!parse_prompt(&p, &prompt)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "model")) { + free(r->model); + if (!json_string(&p, &r->model)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "max_tokens")) { + if (!json_int(&p, &r->max_tokens)) { + free(key); + goto bad; + } + } else if (!strcmp(key, "temperature")) { double v = 0.0; if (!json_number(&p, &v)) { free(key); @@ -3919,134 +4753,785 @@ static bool openai_tool_start_invoke(int fd, server *s, const request *r, const return true; } -static bool openai_tool_start_param(int fd, const request *r, const char *id, - openai_tool_stream *ts, - const char *raw, size_t raw_len) { - const char *tag_end = memchr(raw + ts->parse_pos, '>', raw_len - ts->parse_pos); - if (!tag_end) return true; - char *tag = xstrndup(raw + ts->parse_pos, (size_t)(tag_end - (raw + ts->parse_pos) + 1)); - char *name = dsml_attr(tag, "name"); - char *is_string = dsml_attr(tag, "string"); - free(tag); - if (!name || !is_string) { - free(name); - free(is_string); - return openai_tool_stream_fail(ts); +static bool openai_tool_start_param(int fd, const request *r, const char *id, + openai_tool_stream *ts, + const char *raw, size_t raw_len) { + const char *tag_end = memchr(raw + ts->parse_pos, '>', raw_len - ts->parse_pos); + if (!tag_end) return true; + char *tag = xstrndup(raw + ts->parse_pos, (size_t)(tag_end - (raw + ts->parse_pos) + 1)); + char *name = dsml_attr(tag, "name"); + char *is_string = dsml_attr(tag, "string"); + free(tag); + if (!name || !is_string) { + free(name); + free(is_string); + return openai_tool_stream_fail(ts); + } + bool string_value = !strcmp(is_string, "true"); + bool ok = openai_tool_emit_param_prefix(fd, r, id, ts, name, string_value); + free(name); + free(is_string); + if (!ok) return false; + + ts->param_is_string = string_value; + ts->parse_pos = (size_t)(tag_end - raw) + 1; + ts->state = OPENAI_TOOL_PARAM_VALUE; + return true; +} + +static bool openai_tool_finish_param(int fd, const request *r, const char *id, + openai_tool_stream *ts, + const char *raw, size_t value_end) { + if (value_end > ts->parse_pos) { + bool ok = ts->param_is_string ? + openai_tool_emit_string_value(fd, r, id, ts, raw + ts->parse_pos, + value_end - ts->parse_pos) : + openai_tool_emit_args_fragment(fd, r, id, ts, raw + ts->parse_pos, + value_end - ts->parse_pos); + if (!ok) return false; + } + if (ts->param_is_string && + !openai_tool_emit_args_fragment(fd, r, id, ts, "\"", 1)) return false; + ts->parse_pos = value_end + strlen(ts->param_end); + ts->state = OPENAI_TOOL_BETWEEN_PARAMS; + return true; +} + +static bool openai_tool_stream_update(int fd, server *s, const request *r, const char *id, + openai_tool_stream *ts, + const char *raw, size_t raw_len) { + while (ts->active && ts->parse_pos < raw_len) { + if (ts->state == OPENAI_TOOL_BETWEEN_INVOKES) { + while (ts->parse_pos < raw_len && isspace((unsigned char)raw[ts->parse_pos])) ts->parse_pos++; + if (ts->parse_pos >= raw_len) return true; + if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->tool_calls_end)) { + ts->parse_pos += strlen(ts->tool_calls_end); + ts->active = false; + ts->state = OPENAI_TOOL_DONE; + return true; + } + if (raw_partial_any(raw, raw_len, ts->parse_pos, ts->tool_calls_end, ts->invoke_start)) return true; + if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->invoke_start)) { + size_t before_pos = ts->parse_pos; + openai_tool_stream_state before_state = ts->state; + if (!openai_tool_start_invoke(fd, s, r, id, ts, raw, raw_len)) return false; + if (ts->parse_pos == before_pos && ts->state == before_state) return true; + continue; + } + return openai_tool_stream_fail(ts); + } + + if (ts->state == OPENAI_TOOL_BETWEEN_PARAMS) { + while (ts->parse_pos < raw_len && isspace((unsigned char)raw[ts->parse_pos])) ts->parse_pos++; + if (ts->parse_pos >= raw_len) return true; + if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->invoke_end)) { + if (ts->args_open && + !openai_tool_emit_args_fragment(fd, r, id, ts, "}", 1)) return false; + ts->args_open = false; + ts->parse_pos += strlen(ts->invoke_end); + ts->index++; + ts->state = OPENAI_TOOL_BETWEEN_INVOKES; + continue; + } + if (raw_partial_any(raw, raw_len, ts->parse_pos, ts->invoke_end, ts->param_start)) return true; + if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->param_start)) { + size_t before_pos = ts->parse_pos; + openai_tool_stream_state before_state = ts->state; + if (!openai_tool_start_param(fd, r, id, ts, raw, raw_len)) return false; + if (ts->parse_pos == before_pos && ts->state == before_state) return true; + continue; + } + return openai_tool_stream_fail(ts); + } + + if (ts->state == OPENAI_TOOL_PARAM_VALUE) { + const char *end = find_lit_bounded(raw + ts->parse_pos, + raw_len - ts->parse_pos, + ts->param_end); + if (end) { + if (!openai_tool_finish_param(fd, r, id, ts, raw, + (size_t)(end - raw))) return false; + continue; + } + size_t limit = tool_param_value_stream_safe_len(raw, ts->parse_pos, + raw_len, ts->param_end, + ts->param_is_string); + if (limit > ts->parse_pos) { + bool ok = ts->param_is_string ? + openai_tool_emit_string_value(fd, r, id, ts, raw + ts->parse_pos, + limit - ts->parse_pos) : + openai_tool_emit_args_fragment(fd, r, id, ts, raw + ts->parse_pos, + limit - ts->parse_pos); + if (!ok) return false; + ts->parse_pos = limit; + } + return true; + } + + return true; + } + return true; +} + +static bool openai_sse_stream_update(int fd, server *s, const request *r, const char *id, + openai_stream *st, + const char *raw, size_t raw_len, + bool final) { + if (!st->active || !raw) return true; + + if (st->mode == OPENAI_STREAM_THINKING) { + if (!st->checked_think_prefix) { + const char *open = ""; + const size_t open_len = strlen(open); + if (raw_len < open_len && !strncmp(raw, open, raw_len) && !final) { + return true; + } + if (raw_len >= open_len && !strncmp(raw, open, open_len)) { + st->emit_pos = open_len; + } + st->checked_think_prefix = true; + } + + const char *close = strstr(raw + st->emit_pos, ""); + size_t limit; + if (close) { + limit = (size_t)(close - raw); + } else if (final) { + limit = raw_len; + } else { + const size_t hold = strlen("") - 1; + limit = raw_len > hold ? raw_len - hold : st->emit_pos; + limit = utf8_stream_safe_len(raw, st->emit_pos, limit, false); + } + + if (limit > st->emit_pos) { + if (!sse_chat_delta_n(fd, r, id, "reasoning_content", + raw + st->emit_pos, + limit - st->emit_pos)) return false; + st->sent_reasoning = true; + st->emit_pos = limit; + } + + if (close) { + st->emit_pos = (size_t)(close - raw) + strlen(""); + st->mode = OPENAI_STREAM_TEXT; + } else if (final) { + st->mode = OPENAI_STREAM_SUPPRESS; + return true; + } else { + return true; + } + } + + if (st->mode == OPENAI_STREAM_TEXT) { + const char *tool = r->has_tools ? find_any_tool_start(raw + st->emit_pos) : NULL; + size_t limit = text_stream_safe_limit(raw, st->emit_pos, raw_len, + r->has_tools, final); + + if (limit > st->emit_pos) { + if (!sse_chat_delta_n(fd, r, id, "content", + raw + st->emit_pos, + limit - st->emit_pos)) return false; + st->sent_content = true; + st->emit_pos = limit; + } + + if (tool) { + st->emit_pos = (size_t)(tool - raw); + if (openai_tool_stream_init(&st->tool, raw, raw_len, st->emit_pos)) { + st->mode = OPENAI_STREAM_TOOL; + } else { + st->mode = OPENAI_STREAM_SUPPRESS; + } + } else if (final) { + st->mode = OPENAI_STREAM_SUPPRESS; + } + } + + if (st->mode == OPENAI_STREAM_TOOL) { + if (!openai_tool_stream_update(fd, s, r, id, &st->tool, raw, raw_len)) return false; + if (!st->tool.active) st->mode = OPENAI_STREAM_SUPPRESS; + } + return true; +} + +static bool openai_sse_finish_live(int fd, server *s, const request *r, const char *id, + openai_stream *st, const char *raw, + size_t raw_len, const tool_calls *calls, + const char *finish, int prompt_tokens, + int completion_tokens) { + if (!openai_sse_stream_update(fd, s, r, id, st, raw, raw_len, true)) return false; + + buf b = {0}; + long now = (long)time(NULL); + if (calls && calls->len && !st->tool.emitted_any) { + buf_printf(&b, "data: {\"id\":\"%s\",\"object\":\"chat.completion.chunk\",\"created\":%ld,\"model\":", id, now); + json_escape(&b, r->model); + buf_puts(&b, ",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":"); + append_tool_call_deltas_json(&b, calls, id, &r->tool_orders); + buf_puts(&b, "},\"finish_reason\":null}]}\n\n"); + } + buf_printf(&b, "data: {\"id\":\"%s\",\"object\":\"chat.completion.chunk\",\"created\":%ld,\"model\":", id, now); + json_escape(&b, r->model); + buf_puts(&b, ",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":"); + json_escape(&b, finish); + buf_puts(&b, "}]}\n\n"); + + bool ok = send_all(fd, b.ptr, b.len) && + sse_done(fd, r, id, prompt_tokens, completion_tokens); + buf_free(&b); + return ok; +} + +static bool request_uses_openai_live_stream(const request *r) { + return r->stream && r->api == API_OPENAI && r->kind == REQ_CHAT; +} + +static bool request_uses_responses_live_stream(const request *r) { + return r->stream && r->api == API_RESPONSES && r->kind == REQ_CHAT; +} + +static bool request_uses_structured_stream(const request *r) { + return r->stream && (r->api == API_ANTHROPIC || + r->api == API_RESPONSES || + request_uses_openai_live_stream(r)); +} + +/* Codex' Responses API uses 24-hex suffixes for response/item ids. Prefix + * controls the variant (resp_, rs_, msg_, fc_) so each event references a + * stable identifier across output_item.added / .done. */ +static void responses_random_id(char *dst, size_t dstlen, const char *prefix) { + unsigned char bytes[12]; + size_t pos = snprintf(dst, dstlen, "%s", prefix); + if (pos >= dstlen) return; + static uint64_t fallback_ctr; + if (!random_bytes(bytes, sizeof(bytes))) { + uint64_t a = ((uint64_t)time(NULL) << 32) ^ (uint64_t)getpid(); + uint64_t b = ++fallback_ctr ^ (uint64_t)(uintptr_t)dst; + memcpy(bytes, &a, sizeof(a)); + memcpy(bytes + sizeof(a), &b, sizeof(uint32_t)); + } + static const char hex[] = "0123456789abcdef"; + for (size_t i = 0; i < sizeof(bytes) && pos + 2 < dstlen; i++) { + dst[pos++] = hex[bytes[i] >> 4]; + dst[pos++] = hex[bytes[i] & 15]; + } + dst[pos] = '\0'; +} + +typedef enum { + RESP_STREAM_THINKING, + RESP_STREAM_TEXT, + RESP_STREAM_SUPPRESS, +} responses_stream_mode; + +typedef struct { + responses_stream_mode mode; + size_t emit_pos; + bool active; + bool checked_think_prefix; + bool reasoning_item_opened; + bool reasoning_item_closed; + bool reasoning_summary_started; + bool reasoning_closed_naturally; + bool message_item_opened; + bool message_text_part_open; + bool message_item_closed; + bool reasoning_emitted_any; + bool message_emitted_any; + buf reasoning_text; + buf message_text; + char response_id[40]; + char reasoning_id[40]; + char message_id[40]; + int reasoning_index; /* output_index of the reasoning item (0 if present) */ + int message_index; /* output_index of the assistant message item */ + int next_output_index; /* monotonic counter for upcoming output items */ + int sequence; /* monotonic per-event sequence_number Codex consumes */ +} responses_stream; + +static void responses_stream_init(const request *r, responses_stream *st) { + memset(st, 0, sizeof(*st)); + st->mode = ds4_think_mode_enabled(r->think_mode) ? RESP_STREAM_THINKING : RESP_STREAM_TEXT; + responses_random_id(st->response_id, sizeof(st->response_id), "resp_"); + responses_random_id(st->reasoning_id, sizeof(st->reasoning_id), "rs_"); + responses_random_id(st->message_id, sizeof(st->message_id), "msg_"); + st->reasoning_index = -1; + st->message_index = -1; +} + +static void responses_stream_free(responses_stream *st) { + if (!st) return; + buf_free(&st->reasoning_text); + buf_free(&st->message_text); +} + +/* Codex parses an explicit sequence_number on every Responses event for + * ordering and reconnect resilience. We inject it after the `{"type":...` head + * so emitters can stay readable while still producing the wire shape Codex + * expects. */ +static bool responses_sse_emit_event(int fd, responses_stream *st, const char *body) { + buf b = {0}; + buf_puts(&b, "data: "); + /* body always starts with `{"type":"..."`. We splice in sequence_number + * after the closing quote of that string so every event has it as the + * second field. */ + const char *type_close = NULL; + if (body[0] == '{') { + const char *p = body + 1; + /* Skip the literal `"type":` then the value string. */ + if (!strncmp(p, "\"type\":\"", 8)) { + const char *q = p + 8; + while (*q && *q != '"') { + if (*q == '\\' && q[1]) q += 2; + else q++; + } + if (*q == '"') type_close = q + 1; + } + } + if (type_close) { + size_t head_len = (size_t)(type_close - body); + buf_append(&b, body, head_len); + buf_printf(&b, ",\"sequence_number\":%d", st->sequence++); + buf_puts(&b, type_close); + } else { + buf_puts(&b, body); + } + buf_puts(&b, "\n\n"); + bool ok = send_all(fd, b.ptr, b.len); + buf_free(&b); + return ok; +} + +static bool responses_sse_created(int fd, const request *r, responses_stream *st, + long created_at) { + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.created\",\"response\":{\"id\":\"%s\"," + "\"object\":\"response\",\"created_at\":%ld,\"status\":\"in_progress\"," + "\"model\":", st->response_id, created_at); + json_escape(&b, r->model); + buf_puts(&b, ",\"output\":[]}}"); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_reasoning_added(int fd, responses_stream *st) { + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.output_item.added\",\"output_index\":%d," + "\"item\":{\"id\":\"%s\",\"type\":\"reasoning\",\"status\":\"in_progress\"," + "\"summary\":[]}}", + st->reasoning_index, st->reasoning_id); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_reasoning_summary_part_added(int fd, responses_stream *st) { + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.reasoning_summary_part.added\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"summary_index\":0," + "\"part\":{\"type\":\"summary_text\",\"text\":\"\"}}", + st->reasoning_id, st->reasoning_index); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_reasoning_delta(int fd, responses_stream *st, + const char *text, size_t len) { + if (len == 0) return true; + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.reasoning_summary_text.delta\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"summary_index\":0,\"delta\":", + st->reasoning_id, st->reasoning_index); + json_escape_n(&b, text, len); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static const char *responses_item_status_for_finish(const char *finish) { + if (finish && (!strcmp(finish, "length") || !strcmp(finish, "error"))) return "incomplete"; + return "completed"; +} + +static bool responses_sse_reasoning_done(int fd, responses_stream *st, + const char *finish) { + /* If the stream terminates before `` was actually observed the + * reasoning item is partial — regardless of why generation stopped (EOS, + * stop sequence, tool_calls, length, error). Force the item to incomplete + * so a client replay rejects it instead of feeding unfinished hidden state + * back as completed history. */ + (void)finish; + const char *item_status = + st->reasoning_closed_naturally ? "completed" : "incomplete"; + /* Mirror the message-item close sequence: emit summary_text.done + + * summary_part.done before the output_item.done so clients that key off + * part lifecycle don't see a dangling open summary part. */ + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.reasoning_summary_text.done\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"summary_index\":0,\"text\":", + st->reasoning_id, st->reasoning_index); + json_escape_n(&b, st->reasoning_text.ptr ? st->reasoning_text.ptr : "", + st->reasoning_text.len); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + if (!ok) { + buf_free(&b); + return false; + } + + if (st->reasoning_summary_started) { + buf_free(&b); + buf_printf(&b, + "{\"type\":\"response.reasoning_summary_part.done\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"summary_index\":0," + "\"part\":{\"type\":\"summary_text\",\"text\":", + st->reasoning_id, st->reasoning_index); + json_escape_n(&b, st->reasoning_text.ptr ? st->reasoning_text.ptr : "", + st->reasoning_text.len); + buf_puts(&b, "}}"); + ok = responses_sse_emit_event(fd, st, b.ptr); + if (!ok) { + buf_free(&b); + return false; + } + } + + buf_free(&b); + buf_printf(&b, + "{\"type\":\"response.output_item.done\",\"output_index\":%d," + "\"item\":{\"id\":\"%s\",\"type\":\"reasoning\",\"status\":\"%s\",\"summary\":[", + st->reasoning_index, st->reasoning_id, item_status); + if (st->reasoning_text.len) { + buf_puts(&b, "{\"type\":\"summary_text\",\"text\":"); + json_escape_n(&b, st->reasoning_text.ptr, st->reasoning_text.len); + buf_putc(&b, '}'); + } + buf_puts(&b, "]}}"); + ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_message_added(int fd, responses_stream *st) { + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.output_item.added\",\"output_index\":%d," + "\"item\":{\"id\":\"%s\",\"type\":\"message\",\"status\":\"in_progress\"," + "\"role\":\"assistant\",\"content\":[]}}", + st->message_index, st->message_id); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_message_text_part_added(int fd, responses_stream *st) { + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.content_part.added\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"content_index\":0," + "\"part\":{\"type\":\"output_text\",\"text\":\"\",\"annotations\":[]}}", + st->message_id, st->message_index); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_output_text_delta(int fd, responses_stream *st, + const char *text, size_t len) { + if (len == 0) return true; + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.output_text.delta\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"content_index\":0,\"delta\":", + st->message_id, st->message_index); + json_escape_n(&b, text, len); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +static bool responses_sse_message_done(int fd, responses_stream *st, + const char *finish) { + const char *item_status = responses_item_status_for_finish(finish); + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.output_text.done\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"content_index\":0,\"text\":", + st->message_id, st->message_index); + json_escape_n(&b, st->message_text.ptr ? st->message_text.ptr : "", + st->message_text.len); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + if (!ok) { + buf_free(&b); + return false; + } + + buf_free(&b); + buf_printf(&b, + "{\"type\":\"response.content_part.done\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"content_index\":0," + "\"part\":{\"type\":\"output_text\",\"text\":", + st->message_id, st->message_index); + json_escape_n(&b, st->message_text.ptr ? st->message_text.ptr : "", + st->message_text.len); + buf_puts(&b, ",\"annotations\":[]}}"); + ok = responses_sse_emit_event(fd, st, b.ptr); + if (!ok) { + buf_free(&b); + return false; + } + + buf_free(&b); + buf_printf(&b, + "{\"type\":\"response.output_item.done\",\"output_index\":%d," + "\"item\":{\"id\":\"%s\",\"type\":\"message\",\"status\":\"%s\"," + "\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":", + st->message_index, st->message_id, item_status); + json_escape_n(&b, st->message_text.ptr ? st->message_text.ptr : "", + st->message_text.len); + buf_puts(&b, ",\"annotations\":[]}]}}"); + ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; +} + +/* Item identity per tool call must be stable across added/done/completed. */ +typedef struct { + char fc_id[40]; + char call_id[64]; + bool is_custom; + int output_index; +} responses_tool_item; + +/* The internal tool_call doesn't track whether it came from a function_call or + * a custom_tool_call (or what tool kind is registered). For round-trip + * correctness with the rare custom_tool_call clients, we preserve any provided + * call_id verbatim and pre-assign a stable fc_id; the discriminator currently + * defaults to function_call because Codex CLI registers all its tools as + * function tools. */ +static void responses_tool_items_build(responses_tool_item **out, + const tool_calls *calls, + int starting_output_index) { + *out = NULL; + if (!calls || calls->len == 0) return; + responses_tool_item *items = xmalloc((size_t)calls->len * sizeof(*items)); + for (int i = 0; i < calls->len; i++) { + memset(&items[i], 0, sizeof(items[i])); + responses_random_id(items[i].fc_id, sizeof(items[i].fc_id), "fc_"); + if (calls->v[i].id && calls->v[i].id[0]) { + snprintf(items[i].call_id, sizeof(items[i].call_id), "%s", calls->v[i].id); + } else { + responses_random_id(items[i].call_id, sizeof(items[i].call_id), "call_"); + } + items[i].is_custom = false; + items[i].output_index = starting_output_index + i; + } + *out = items; +} + +static void responses_append_function_call_item(buf *b, const tool_call *tc, + const responses_tool_item *item, + const char *item_status, + bool with_args) { + const char *item_type = item->is_custom ? "custom_tool_call" : "function_call"; + const char *body_field = item->is_custom ? "input" : "arguments"; + buf_printf(b, + "{\"id\":\"%s\",\"type\":\"%s\",\"status\":\"%s\",\"name\":", + item->fc_id, item_type, item_status); + json_escape(b, tc->name ? tc->name : ""); + buf_puts(b, ",\"call_id\":"); + json_escape(b, item->call_id); + buf_printf(b, ",\"%s\":", body_field); + if (!with_args) { + buf_puts(b, "\"\""); + } else if (item->is_custom) { + json_escape(b, tc->arguments ? tc->arguments : ""); + } else { + append_json_object_string(b, tc->arguments); } - bool string_value = !strcmp(is_string, "true"); - bool ok = openai_tool_emit_param_prefix(fd, r, id, ts, name, string_value); - free(name); - free(is_string); - if (!ok) return false; + buf_putc(b, '}'); +} - ts->param_is_string = string_value; - ts->parse_pos = (size_t)(tag_end - raw) + 1; - ts->state = OPENAI_TOOL_PARAM_VALUE; - return true; +static bool responses_sse_function_call_event(int fd, responses_stream *st, + const tool_call *tc, + const responses_tool_item *item, + const char *finish, + bool done) { + /* The added event marks a tool call as in_progress per the Responses + * lifecycle; only output_item.done (and the terminal response output) + * carry the final completed / incomplete status. The added item ships with + * an empty arguments string so clients that accumulate via + * function_call_arguments.delta + .done don't end up with doubled JSON. */ + const char *item_status = done ? responses_item_status_for_finish(finish) : "in_progress"; + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.output_item.%s\",\"output_index\":%d,\"item\":", + done ? "done" : "added", item->output_index); + responses_append_function_call_item(&b, tc, item, item_status, done); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; } -static bool openai_tool_finish_param(int fd, const request *r, const char *id, - openai_tool_stream *ts, - const char *raw, size_t value_end) { - if (value_end > ts->parse_pos) { - bool ok = ts->param_is_string ? - openai_tool_emit_string_value(fd, r, id, ts, raw + ts->parse_pos, - value_end - ts->parse_pos) : - openai_tool_emit_args_fragment(fd, r, id, ts, raw + ts->parse_pos, - value_end - ts->parse_pos); - if (!ok) return false; +/* Stream function-call arguments as a single delta + done, since DS4 generates + * the whole DSML invoke as one unit before the worker decides which tool was + * called. Clients that follow the OpenAI Responses lifecycle expect both + * events between output_item.added (in_progress) and output_item.done. */ +static bool responses_sse_function_call_arguments_done(int fd, responses_stream *st, + const tool_call *tc, + const responses_tool_item *item) { + if (item->is_custom) return true; + buf args = {0}; + append_json_object_string(&args, tc->arguments); + buf b = {0}; + buf_printf(&b, + "{\"type\":\"response.function_call_arguments.delta\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"delta\":", + item->fc_id, item->output_index); + buf_append(&b, args.ptr ? args.ptr : "\"\"", args.ptr ? args.len : 2); + buf_putc(&b, '}'); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + if (!ok) { + buf_free(&b); + buf_free(&args); + return false; } - if (ts->param_is_string && - !openai_tool_emit_args_fragment(fd, r, id, ts, "\"", 1)) return false; - ts->parse_pos = value_end + strlen(ts->param_end); - ts->state = OPENAI_TOOL_BETWEEN_PARAMS; - return true; + + buf_free(&b); + buf_printf(&b, + "{\"type\":\"response.function_call_arguments.done\"," + "\"item_id\":\"%s\",\"output_index\":%d,\"name\":", + item->fc_id, item->output_index); + json_escape(&b, tc->name ? tc->name : ""); + buf_puts(&b, ",\"arguments\":"); + buf_append(&b, args.ptr ? args.ptr : "\"\"", args.ptr ? args.len : 2); + buf_putc(&b, '}'); + ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + buf_free(&args); + return ok; } -static bool openai_tool_stream_update(int fd, server *s, const request *r, const char *id, - openai_tool_stream *ts, - const char *raw, size_t raw_len) { - while (ts->active && ts->parse_pos < raw_len) { - if (ts->state == OPENAI_TOOL_BETWEEN_INVOKES) { - while (ts->parse_pos < raw_len && isspace((unsigned char)raw[ts->parse_pos])) ts->parse_pos++; - if (ts->parse_pos >= raw_len) return true; - if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->tool_calls_end)) { - ts->parse_pos += strlen(ts->tool_calls_end); - ts->active = false; - ts->state = OPENAI_TOOL_DONE; - return true; - } - if (raw_partial_any(raw, raw_len, ts->parse_pos, ts->tool_calls_end, ts->invoke_start)) return true; - if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->invoke_start)) { - size_t before_pos = ts->parse_pos; - openai_tool_stream_state before_state = ts->state; - if (!openai_tool_start_invoke(fd, s, r, id, ts, raw, raw_len)) return false; - if (ts->parse_pos == before_pos && ts->state == before_state) return true; - continue; - } - return openai_tool_stream_fail(ts); - } +static const char *responses_status_for_finish(const char *finish) { + if (finish && !strcmp(finish, "length")) return "incomplete"; + if (finish && !strcmp(finish, "error")) return "failed"; + return "completed"; +} - if (ts->state == OPENAI_TOOL_BETWEEN_PARAMS) { - while (ts->parse_pos < raw_len && isspace((unsigned char)raw[ts->parse_pos])) ts->parse_pos++; - if (ts->parse_pos >= raw_len) return true; - if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->invoke_end)) { - if (ts->args_open && - !openai_tool_emit_args_fragment(fd, r, id, ts, "}", 1)) return false; - ts->args_open = false; - ts->parse_pos += strlen(ts->invoke_end); - ts->index++; - ts->state = OPENAI_TOOL_BETWEEN_INVOKES; - continue; - } - if (raw_partial_any(raw, raw_len, ts->parse_pos, ts->invoke_end, ts->param_start)) return true; - if (raw_full_lit(raw, raw_len, ts->parse_pos, ts->param_start)) { - size_t before_pos = ts->parse_pos; - openai_tool_stream_state before_state = ts->state; - if (!openai_tool_start_param(fd, r, id, ts, raw, raw_len)) return false; - if (ts->parse_pos == before_pos && ts->state == before_state) return true; - continue; - } - return openai_tool_stream_fail(ts); - } +static bool responses_sse_completed(int fd, const request *r, + responses_stream *st, + const tool_calls *calls, + const responses_tool_item *tool_items, + const char *finish, + int prompt_tokens, int completion_tokens, + long created_at) { + /* Codex routes terminal behaviour off the event type, not response.status. + * Decide here so clients see response.failed / response.incomplete instead + * of a "completed" wrapper marked failed in a sub-field. */ + const char *event_type = "response.completed"; + if (finish && !strcmp(finish, "error")) event_type = "response.failed"; + else if (finish && !strcmp(finish, "length")) event_type = "response.incomplete"; + const char *status = responses_status_for_finish(finish); - if (ts->state == OPENAI_TOOL_PARAM_VALUE) { - const char *end = find_lit_bounded(raw + ts->parse_pos, - raw_len - ts->parse_pos, - ts->param_end); - if (end) { - if (!openai_tool_finish_param(fd, r, id, ts, raw, - (size_t)(end - raw))) return false; - continue; - } - size_t limit = tool_param_value_stream_safe_len(raw, ts->parse_pos, - raw_len, ts->param_end, - ts->param_is_string); - if (limit > ts->parse_pos) { - bool ok = ts->param_is_string ? - openai_tool_emit_string_value(fd, r, id, ts, raw + ts->parse_pos, - limit - ts->parse_pos) : - openai_tool_emit_args_fragment(fd, r, id, ts, raw + ts->parse_pos, - limit - ts->parse_pos); - if (!ok) return false; - ts->parse_pos = limit; - } - return true; + buf b = {0}; + buf_printf(&b, + "{\"type\":\"%s\",\"response\":{\"id\":\"%s\"," + "\"object\":\"response\",\"created_at\":%ld,\"status\":\"%s\",\"model\":", + event_type, st->response_id, created_at, status); + json_escape(&b, r->model); + if (!strcmp(event_type, "response.failed")) { + buf_puts(&b, ",\"error\":{\"code\":\"server_error\"," + "\"message\":\"generation failed\"}"); + } else if (!strcmp(event_type, "response.incomplete")) { + buf_puts(&b, ",\"incomplete_details\":{\"reason\":\"max_tokens\"}"); + } + const char *item_status = responses_item_status_for_finish(finish); + buf_puts(&b, ",\"output\":["); + bool wrote = false; + if (st->reasoning_emitted_any) { + /* Match responses_sse_reasoning_done: if the stream stopped before + * , the reasoning item is partial regardless of the + * response-level finish status, so replay must reject it. */ + const char *reasoning_status = + st->reasoning_closed_naturally ? "completed" : "incomplete"; + buf_printf(&b, + "{\"id\":\"%s\",\"type\":\"reasoning\",\"status\":\"%s\",\"summary\":[", + st->reasoning_id, reasoning_status); + if (st->reasoning_text.len) { + buf_puts(&b, "{\"type\":\"summary_text\",\"text\":"); + json_escape_n(&b, st->reasoning_text.ptr, st->reasoning_text.len); + buf_putc(&b, '}'); } - - return true; + buf_puts(&b, "]}"); + wrote = true; } - return true; + if (st->message_emitted_any) { + if (wrote) buf_putc(&b, ','); + buf_printf(&b, + "{\"id\":\"%s\",\"type\":\"message\",\"status\":\"%s\"," + "\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":", + st->message_id, item_status); + json_escape_n(&b, st->message_text.ptr ? st->message_text.ptr : "", + st->message_text.len); + buf_puts(&b, ",\"annotations\":[]}]}"); + wrote = true; + } + if (calls && tool_items) { + for (int i = 0; i < calls->len; i++) { + if (wrote) buf_putc(&b, ','); + responses_append_function_call_item(&b, &calls->v[i], &tool_items[i], item_status, true); + wrote = true; + } + } + buf_putc(&b, ']'); + buf_printf(&b, + ",\"usage\":{\"input_tokens\":%d,\"input_tokens_details\":{\"cached_tokens\":0}," + "\"output_tokens\":%d,\"output_tokens_details\":{\"reasoning_tokens\":0}," + "\"total_tokens\":%d}}}", + prompt_tokens, completion_tokens, prompt_tokens + completion_tokens); + bool ok = responses_sse_emit_event(fd, st, b.ptr); + buf_free(&b); + return ok; } -static bool openai_sse_stream_update(int fd, server *s, const request *r, const char *id, - openai_stream *st, - const char *raw, size_t raw_len, - bool final) { +/* Responses streaming consumes the same raw token text the OpenAI live stream + * consumes: ... is reasoning, anything before the tool-call + * marker is output text. Tool-call argument deltas are not surfaced because + * Codex' SSE parser only ingests function_call items via output_item.done. */ +static bool responses_sse_stream_update(int fd, const request *r, + responses_stream *st, + const char *raw, size_t raw_len, + bool final) { if (!st->active || !raw) return true; - if (st->mode == OPENAI_STREAM_THINKING) { + /* The client only sees reasoning if it explicitly opted in via + * reasoning.summary. Otherwise we still need to walk past ... + * to find the user-visible text, but we suppress the per-chunk emission. */ + const bool emit_reasoning = r->reasoning_summary_emit; + + if (st->mode == RESP_STREAM_THINKING) { if (!st->checked_think_prefix) { + /* The chat template ends the prompt with the literal `` (or + * `` when thinking is off), so generation usually starts + * mid-reasoning rather than with the open tag. If the model does + * happen to repeat `` we skip it; otherwise start from + * position 0. The earlier "no-think-prefix => switch to TEXT" + * shortcut here was incorrect: it leaked reasoning to clients as + * regular output_text because the model was already inside the + * think block when it produced its first token. The actual + * mode change to TEXT happens only when `` is observed. */ const char *open = ""; const size_t open_len = strlen(open); if (raw_len < open_len && !strncmp(raw, open, raw_len) && !final) { @@ -4071,93 +5556,205 @@ static bool openai_sse_stream_update(int fd, server *s, const request *r, const } if (limit > st->emit_pos) { - if (!sse_chat_delta_n(fd, r, id, "reasoning_content", - raw + st->emit_pos, - limit - st->emit_pos)) return false; - st->sent_reasoning = true; + if (emit_reasoning) { + if (!st->reasoning_item_opened) { + st->reasoning_index = st->next_output_index++; + if (!responses_sse_reasoning_added(fd, st)) return false; + st->reasoning_item_opened = true; + } + if (!st->reasoning_summary_started) { + if (!responses_sse_reasoning_summary_part_added(fd, st)) return false; + st->reasoning_summary_started = true; + } + if (!responses_sse_reasoning_delta(fd, st, + raw + st->emit_pos, + limit - st->emit_pos)) return false; + buf_append(&st->reasoning_text, raw + st->emit_pos, limit - st->emit_pos); + st->reasoning_emitted_any = true; + } st->emit_pos = limit; } if (close) { st->emit_pos = (size_t)(close - raw) + strlen(""); - st->mode = OPENAI_STREAM_TEXT; + st->mode = RESP_STREAM_TEXT; + st->reasoning_closed_naturally = true; } else if (final) { - st->mode = OPENAI_STREAM_SUPPRESS; + st->mode = RESP_STREAM_SUPPRESS; return true; } else { return true; } } - if (st->mode == OPENAI_STREAM_TEXT) { + if (st->mode == RESP_STREAM_TEXT) { const char *tool = r->has_tools ? find_any_tool_start(raw + st->emit_pos) : NULL; size_t limit = text_stream_safe_limit(raw, st->emit_pos, raw_len, r->has_tools, final); if (limit > st->emit_pos) { - if (!sse_chat_delta_n(fd, r, id, "content", - raw + st->emit_pos, - limit - st->emit_pos)) return false; - st->sent_content = true; + if (!st->message_item_opened) { + st->message_index = st->next_output_index++; + if (!responses_sse_message_added(fd, st)) return false; + st->message_item_opened = true; + } + if (!st->message_text_part_open) { + if (!responses_sse_message_text_part_added(fd, st)) return false; + st->message_text_part_open = true; + } + if (!responses_sse_output_text_delta(fd, st, + raw + st->emit_pos, + limit - st->emit_pos)) return false; + buf_append(&st->message_text, raw + st->emit_pos, limit - st->emit_pos); + st->message_emitted_any = true; st->emit_pos = limit; } if (tool) { st->emit_pos = (size_t)(tool - raw); - if (openai_tool_stream_init(&st->tool, raw, raw_len, st->emit_pos)) { - st->mode = OPENAI_STREAM_TOOL; - } else { - st->mode = OPENAI_STREAM_SUPPRESS; - } + st->mode = RESP_STREAM_SUPPRESS; } else if (final) { - st->mode = OPENAI_STREAM_SUPPRESS; + st->mode = RESP_STREAM_SUPPRESS; } } + return true; +} - if (st->mode == OPENAI_STREAM_TOOL) { - if (!openai_tool_stream_update(fd, s, r, id, &st->tool, raw, raw_len)) return false; - if (!st->tool.active) st->mode = OPENAI_STREAM_SUPPRESS; +static bool responses_sse_finish_live(int fd, const request *r, + responses_stream *st, + const char *raw, size_t raw_len, + const char *recovered_content, + const tool_calls *calls, + const char *finish, + int prompt_tokens, int completion_tokens, + long created_at) { + if (!responses_sse_stream_update(fd, r, st, raw, raw_len, true)) return false; + + /* Close any half-open reasoning summary so the TUI knows the part ended + * before we slot in any tool calls or completion. */ + if (st->reasoning_item_opened && !st->reasoning_item_closed) { + if (!responses_sse_reasoning_done(fd, st, finish)) return false; + st->reasoning_item_closed = true; + } + /* Recovery path: when DSML tool parsing fails the worker promotes the entire + * generation to assistant text. Streaming had already entered suppress mode + * at the tool marker, so anything in raw[st->emit_pos..raw_len] never made + * it to the client. Emit those bytes as additional output_text deltas so + * what the client accumulates matches output_item.done and the terminal + * response. We use the stream cursor instead of comparing against + * recovered_content because the raw text can begin with `...` + * which the streaming side consumed as reasoning, not message text. */ + if (recovered_content && raw && st->emit_pos < raw_len) { + const char *tail = raw + st->emit_pos; + size_t tail_len = raw_len - st->emit_pos; + if (!st->message_item_opened) { + st->message_index = st->next_output_index++; + if (!responses_sse_message_added(fd, st)) return false; + st->message_item_opened = true; + } + if (!st->message_text_part_open) { + if (!responses_sse_message_text_part_added(fd, st)) return false; + st->message_text_part_open = true; + } + if (!responses_sse_output_text_delta(fd, st, tail, tail_len)) return false; + buf_append(&st->message_text, tail, tail_len); + st->message_emitted_any = true; + st->emit_pos = raw_len; + } + if (st->message_item_opened && !st->message_item_closed) { + if (!responses_sse_message_done(fd, st, finish)) return false; + st->message_item_closed = true; + } + responses_tool_item *items = NULL; + responses_tool_items_build(&items, calls, st->next_output_index); + if (items && calls) st->next_output_index += calls->len; + bool ok = true; + if (items && calls) { + for (int i = 0; i < calls->len && ok; i++) { + ok = responses_sse_function_call_event(fd, st, &calls->v[i], &items[i], finish, false); + if (ok) ok = responses_sse_function_call_arguments_done(fd, st, &calls->v[i], &items[i]); + if (ok) ok = responses_sse_function_call_event(fd, st, &calls->v[i], &items[i], finish, true); + } } - return true; + if (ok) ok = responses_sse_completed(fd, r, st, calls, items, finish, + prompt_tokens, completion_tokens, created_at); + free(items); + return ok; } -static bool openai_sse_finish_live(int fd, server *s, const request *r, const char *id, - openai_stream *st, const char *raw, - size_t raw_len, const tool_calls *calls, - const char *finish, int prompt_tokens, - int completion_tokens) { - if (!openai_sse_stream_update(fd, s, r, id, st, raw, raw_len, true)) return false; +static bool responses_final_response(int fd, const request *r, const char *id, + const char *text, const char *reasoning, + const tool_calls *calls, const char *finish, + int prompt_tokens, int completion_tokens) { + (void)id; + char response_id[40], reasoning_id[40], message_id[40]; + responses_random_id(response_id, sizeof(response_id), "resp_"); + responses_random_id(reasoning_id, sizeof(reasoning_id), "rs_"); + responses_random_id(message_id, sizeof(message_id), "msg_"); + + responses_tool_item *items = NULL; + responses_tool_items_build(&items, calls, 0); - buf b = {0}; long now = (long)time(NULL); - if (calls && calls->len && !st->tool.emitted_any) { - buf_printf(&b, "data: {\"id\":\"%s\",\"object\":\"chat.completion.chunk\",\"created\":%ld,\"model\":", id, now); - json_escape(&b, r->model); - buf_puts(&b, ",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":"); - append_tool_call_deltas_json(&b, calls, id, &r->tool_orders); - buf_puts(&b, "},\"finish_reason\":null}]}\n\n"); - } - buf_printf(&b, "data: {\"id\":\"%s\",\"object\":\"chat.completion.chunk\",\"created\":%ld,\"model\":", id, now); + const char *status = responses_status_for_finish(finish); + const char *item_status = responses_item_status_for_finish(finish); + buf b = {0}; + buf_printf(&b, + "{\"id\":\"%s\",\"object\":\"response\",\"created_at\":%ld,\"status\":\"%s\"," + "\"model\":", + response_id, now, status); json_escape(&b, r->model); - buf_puts(&b, ",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":"); - json_escape(&b, finish); - buf_puts(&b, "}]}\n\n"); - - bool ok = send_all(fd, b.ptr, b.len) && - sse_done(fd, r, id, prompt_tokens, completion_tokens); + if (finish && !strcmp(finish, "error")) { + buf_puts(&b, ",\"error\":{\"code\":\"server_error\"," + "\"message\":\"generation failed\"}"); + } else if (finish && !strcmp(finish, "length")) { + buf_puts(&b, ",\"incomplete_details\":{\"reason\":\"max_tokens\"}"); + } + buf_puts(&b, ",\"output\":["); + bool wrote = false; + if (reasoning && reasoning[0] && r->reasoning_summary_emit) { + /* Non-streaming path runs after the worker has post-processed the + * generation, so any reasoning here came from a parsed assistant turn + * where was observed (otherwise the reasoning text would be + * empty). Tag it with the response-level item_status which still flips + * to incomplete/failed when finish is length/error. */ + buf_printf(&b, + "{\"id\":\"%s\",\"type\":\"reasoning\",\"status\":\"%s\"," + "\"summary\":[{\"type\":\"summary_text\",\"text\":", + reasoning_id, item_status); + json_escape(&b, reasoning); + buf_puts(&b, "}]}"); + wrote = true; + } + if (text && text[0]) { + if (wrote) buf_putc(&b, ','); + buf_printf(&b, + "{\"id\":\"%s\",\"type\":\"message\",\"status\":\"%s\"," + "\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":", + message_id, item_status); + json_escape(&b, text); + buf_puts(&b, ",\"annotations\":[]}]}"); + wrote = true; + } + if (calls && items) { + for (int i = 0; i < calls->len; i++) { + if (wrote) buf_putc(&b, ','); + responses_append_function_call_item(&b, &calls->v[i], &items[i], item_status, true); + wrote = true; + } + } + buf_putc(&b, ']'); + buf_printf(&b, + ",\"usage\":{\"input_tokens\":%d,\"input_tokens_details\":{\"cached_tokens\":0}," + "\"output_tokens\":%d,\"output_tokens_details\":{\"reasoning_tokens\":0}," + "\"total_tokens\":%d}}", + prompt_tokens, completion_tokens, prompt_tokens + completion_tokens); + bool ok = http_response(fd, 200, "application/json", b.ptr); buf_free(&b); + free(items); return ok; } -static bool request_uses_openai_live_stream(const request *r) { - return r->stream && r->api == API_OPENAI && r->kind == REQ_CHAT; -} - -static bool request_uses_structured_stream(const request *r) { - return r->stream && (r->api == API_ANTHROPIC || - request_uses_openai_live_stream(r)); -} - static bool final_response(int fd, const request *r, const char *id, const char *text, const char *reasoning, const tool_calls *calls, const char *finish, int prompt_tokens, int completion_tokens) { @@ -7058,7 +8655,10 @@ static void generate_job(server *s, job *j) { bool structured_stream = request_uses_structured_stream(&j->req); anthropic_stream anthropic_live = {0}; openai_stream openai_live = {0}; + responses_stream responses_live = {0}; const bool openai_live_chat = request_uses_openai_live_stream(&j->req); + const bool responses_live_chat = request_uses_responses_live_stream(&j->req); + long responses_created_at = (long)time(NULL); if (j->req.stream) { if (!sse_headers(j->fd)) { server_log(DS4_LOG_GENERATION, "ds4-server: %s ctx=%s sse headers failed", j->req.kind == REQ_CHAT ? "chat" : "completion", ctx_span); @@ -7079,6 +8679,16 @@ static void generate_job(server *s, job *j) { return; } if (openai_live_chat) openai_stream_start(&j->req, &openai_live); + if (responses_live_chat) { + responses_stream_init(&j->req, &responses_live); + responses_live.active = true; + if (!responses_sse_created(j->fd, &j->req, &responses_live, responses_created_at)) { + server_log(DS4_LOG_GENERATION, "ds4-server: chat ctx=%s responses created event failed", ctx_span); + responses_stream_free(&responses_live); + ds4_tokens_free(&effective_prompt); + return; + } + } } buf text = {0}; @@ -7227,6 +8837,16 @@ static void generate_job(server *s, job *j) { stop_decode = true; break; } + if (responses_live_chat && + !responses_sse_stream_update(j->fd, &j->req, + &responses_live, text.ptr, stream_len, + false)) { + finish = "error"; + snprintf(err, sizeof(err), "client stream write failed"); + free(piece); + stop_decode = true; + break; + } free(piece); if (j->req.kind == REQ_CHAT && j->req.has_tools) { @@ -7385,6 +9005,19 @@ static void generate_job(server *s, job *j) { text.ptr ? text.ptr : "", text.len, &parsed_calls, final_finish, prompt_tokens, completion); + } else if (responses_live_chat) { + /* If parse recovered a malformed tool call back to plain text, + * pass parsed_content so the streaming tail can be flushed; in + * the normal path parsed_content is the assistant text we already + * streamed and the diff is empty. */ + const char *recover = + recovered_tool_parse_failure ? parsed_content : NULL; + response_ok = responses_sse_finish_live(j->fd, &j->req, &responses_live, + text.ptr ? text.ptr : "", text.len, + recover, + &parsed_calls, final_finish, + prompt_tokens, completion, + responses_created_at); } else if (structured_stream) { response_ok = sse_chat_finish(j->fd, &j->req, id, parsed_content ? parsed_content : (text.ptr ? text.ptr : ""), @@ -7407,6 +9040,12 @@ static void generate_job(server *s, job *j) { parsed_reasoning, &parsed_calls, final_finish, prompt_tokens, completion); + } else if (j->req.api == API_RESPONSES) { + responses_final_response(j->fd, &j->req, id, + parsed_content ? parsed_content : (text.ptr ? text.ptr : ""), + parsed_reasoning, + &parsed_calls, final_finish, + prompt_tokens, completion); } else { final_response(j->fd, &j->req, id, parsed_content ? parsed_content : (text.ptr ? text.ptr : ""), @@ -7475,6 +9114,7 @@ static void generate_job(server *s, job *j) { free(parsed_reasoning); tool_calls_free(&parsed_calls); openai_stream_free(&openai_live); + responses_stream_free(&responses_live); buf_free(&text); ds4_tokens_free(&effective_prompt); } @@ -7708,6 +9348,9 @@ static void *client_main(void *arg) { } else if (!strcmp(hr.method, "POST") && !strcmp(hr.path, "/v1/chat/completions")) { ok = parse_chat_request(s->engine, s, hr.body, s->default_tokens, ctx_size, &req, err, sizeof(err)); + } else if (!strcmp(hr.method, "POST") && !strcmp(hr.path, "/v1/responses")) { + ok = parse_responses_request(s->engine, s, hr.body, s->default_tokens, + ctx_size, &req, err, sizeof(err)); } else if (!strcmp(hr.method, "POST") && !strcmp(hr.path, "/v1/completions")) { ok = parse_completion_request(s->engine, hr.body, s->default_tokens, ctx_size, &req, err, sizeof(err)); @@ -7957,7 +9600,7 @@ static void usage(FILE *fp) { " ./ds4-server --ctx 100000 --kv-disk-dir /tmp/ds4-kv --kv-disk-space-mb 8192\n" "\n" "Notes:\n" - " Use /v1/chat/completions, /v1/completions, or /v1/messages.\n" + " Use /v1/chat/completions, /v1/responses, /v1/completions, or /v1/messages.\n" " Larger --ctx values allocate more KV memory at startup; the startup log prints the estimate.\n" " Disk KV caching is best for agents that resend long prompts with stable prefixes.\n" "\n"