diff --git a/helper.php b/helper.php index 68f0631..3859b0c 100644 --- a/helper.php +++ b/helper.php @@ -60,26 +60,79 @@ function streamOpenAiApi(object $config, string $prompt, string $content, callab ], ]; - $curl_info[CURLOPT_WRITEFUNCTION] = function ($curl_info, $data) use ($task_callback, $finish_callback) { + $stream_buffer = ''; + $error_buffer = ''; + $http_error_handled = false; + + $curl_info[CURLOPT_WRITEFUNCTION] = function ($curl_handle, $data) use (&$stream_buffer, &$error_buffer, &$http_error_handled, $task_callback, $finish_callback) { Minz_Log::debug('Receive msg:' . $data); - // if http status code != 200, then call the finish_callback to send the error message and stop the stream - if (curl_getinfo($curl_info, CURLINFO_HTTP_CODE) != 200) { - $task_callback(_errorHtmlSuffix(json_decode($data)->error->message)); - $finish_callback(); + $status = curl_getinfo($curl_handle, CURLINFO_HTTP_CODE); + + // Handle non-200 HTTP responses by buffering until we can decode the error payload. + if ($status && $status != 200) { + if (!$http_error_handled) { + $error_buffer .= $data; + $decoded_error = json_decode($error_buffer); + if ($decoded_error !== null) { + $message = isset($decoded_error->error->message) ? $decoded_error->error->message : _t('gen.error.unknown'); + $task_callback(_errorHtmlSuffix($message)); + $finish_callback(); + $http_error_handled = true; + } + } + return strlen($data); } - $msg_list = explode(PHP_EOL, trim($data)); - foreach ($msg_list as $msg) { - $msg = trim(substr(trim($msg), 5)); + $stream_buffer .= $data; + $stream_buffer = str_replace("\r\n", "\n", $stream_buffer); - if ($msg == '') { - continue; - } else if ($msg == "[DONE]") { + while (($event_separator_pos = strpos($stream_buffer, "\n\n")) !== false) { + $raw_event = substr($stream_buffer, 0, $event_separator_pos); + $stream_buffer = substr($stream_buffer, $event_separator_pos + 2); + + $lines = preg_split('/\r?\n/', $raw_event); + $event_name = ''; + $data_lines = []; + + foreach ($lines as $line) { + $line = trim($line); + if ($line === '') { + continue; + } + + if (stripos($line, 'event:') === 0) { + $event_name = trim(substr($line, strlen('event:'))); + continue; + } + + if (stripos($line, 'data:') === 0) { + $data_lines[] = ltrim(substr($line, strlen('data:'))); + } + } + + $event_data = implode("\n", $data_lines); + + if ($event_name === 'done' || $event_data === '[DONE]') { $finish_callback(); - } else { - $task_callback(_dealResponse(json_decode($msg))); + continue; + } + + if ($event_data === '') { + continue; + } + + $decoded_payload = json_decode($event_data); + if ($decoded_payload === null) { + // Wait for more data if JSON is incomplete. + $stream_buffer = $raw_event . "\n\n" . $stream_buffer; + break; + } + + $delta = _dealResponse($decoded_payload); + if ($delta !== '') { + $task_callback($delta); } }