From 848f32ab64e817b2fe58e754ed797210be6d0ca8 Mon Sep 17 00:00:00 2001 From: Hemachandar Date: Mon, 2 Mar 2026 13:44:17 +0530 Subject: [PATCH] Streamed response support --- app/controllers.php | 35 +++++++++++++++++++++++++++++ src/Executor/Runner/Docker.php | 40 ++++++++++++++++++++++++++++++---- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/app/controllers.php b/app/controllers.php index e1a5c85..44f6213 100644 --- a/app/controllers.php +++ b/app/controllers.php @@ -159,6 +159,7 @@ ->param('runtimeEntrypoint', '', new Text(1024, 0), 'Commands to run when creating a container. Maximum of 100 commands are allowed, each 1024 characters long.', true) ->param('logging', true, new Boolean(true), 'Whether executions will be logged.', true) ->param('restartPolicy', DockerAPI::RESTART_NO, new WhiteList([DockerAPI::RESTART_NO, DockerAPI::RESTART_ALWAYS, DockerAPI::RESTART_ON_FAILURE, DockerAPI::RESTART_UNLESS_STOPPED], true), 'Define restart policy once exit code is returned by command. Default value is "no". Possible values are "no", "always", "on-failure", "unless-stopped".', true) + ->param('stream', false, new Boolean(true), 'Whether to stream the response. If the runtime responds with content-type: text/event-stream, chunks are forwarded in real-time.', true) ->inject('response') ->inject('request') ->inject('runner') @@ -180,6 +181,7 @@ function ( string $runtimeEntrypoint, bool $logging, string $restartPolicy, + bool $stream, Response $response, Request $request, Runner $runner @@ -214,6 +216,32 @@ function ( $variables = array_map(strval(...), $variables); + // Streaming path: if $stream=true, pass a callback and pipe SSE chunks directly + $streamingDetected = false; + $streamCallback = null; + + if ($stream) { + $streamCallback = function (?string $data, ?array $runtimeHeaders) use ($response, &$streamingDetected): void { + if ($runtimeHeaders !== null) { + // Headers signal — fired once before any body chunk (only when SSE detected). + // This is the only opportunity to set response headers before body starts. + $streamingDetected = true; + $statusCode = \intval($runtimeHeaders['x-open-runtimes-status-code'] ?? 200); + $response->setStatusCode($statusCode); + foreach ($runtimeHeaders as $key => $value) { + if (\str_starts_with($key, 'x-open-runtimes-')) { + continue; + } + $response->addHeader($key, \is_array($value) ? \implode(', ', $value) : $value); + } + $response->addHeader('x-open-runtimes-streaming', 'true'); + } + if ($data !== null) { + $response->write($data); + } + }; + } + $execution = $runner->createExecution( $runtimeId, $payload, @@ -231,8 +259,15 @@ function ( $runtimeEntrypoint, $logging, $restartPolicy, + streamCallback: $streamCallback, ); + // If streaming was detected (SSE), body was already forwarded — close and return. + if ($streamingDetected) { + $response->end(); + return; + } + // Backwards compatibility for headers $responseFormat = $request->getHeader('x-executor-response-format', '0.10.0'); // Last version without support for array value for headers if (version_compare($responseFormat, '0.11.0', '<')) { diff --git a/src/Executor/Runner/Docker.php b/src/Executor/Runner/Docker.php index 6e81be5..52d5ac5 100644 --- a/src/Executor/Runner/Docker.php +++ b/src/Executor/Runner/Docker.php @@ -539,6 +539,7 @@ public function createExecution( bool $logging, string $restartPolicy, string $region = '', + ?callable $streamCallback = null, ): mixed { $runtimeName = System::getHostname() . '-' . $runtimeId; @@ -751,7 +752,7 @@ public function createExecution( ]; }; - $executeV5 = function () use ($path, $method, $headers, $payload, $secret, $hostname, $timeout, $runtimeName, $logging): array { + $executeV5 = function () use ($path, $method, $headers, $payload, $secret, $hostname, $timeout, $runtimeName, $logging, $streamCallback): array { $statusCode = 0; $errNo = -1; $executorResponse = ''; @@ -759,6 +760,7 @@ public function createExecution( $ch = \curl_init(); $responseHeaders = []; + $isStreaming = false; if (!(\str_starts_with($path, '/'))) { $path = '/' . $path; @@ -772,8 +774,7 @@ public function createExecution( \curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); } - \curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - \curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders): int { + \curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders, &$isStreaming): int { $len = strlen($header); $header = explode(':', $header, 2); if (count($header) < 2) { // ignore invalid headers @@ -787,6 +788,10 @@ public function createExecution( $value = \urldecode($value); } + if ($key === 'content-type' && \str_contains(\strtolower($value), 'text/event-stream')) { + $isStreaming = true; + } + if (\array_key_exists($key, $responseHeaders)) { if (is_array($responseHeaders[$key])) { $responseHeaders[$key][] = $value; @@ -800,6 +805,26 @@ public function createExecution( return $len; }); + if ($streamCallback !== null) { + $headersFired = false; + \curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $data) use ($streamCallback, &$headersFired, &$responseHeaders, &$isStreaming, &$executorResponse): int { + if ($isStreaming) { + // SSE path: forward chunks to caller + if (!$headersFired) { + $streamCallback(null, $responseHeaders); // headers signal (fired once) + $headersFired = true; + } + $streamCallback($data, null); + } else { + // Non-SSE fallback: accumulate body like CURLOPT_RETURNTRANSFER + $executorResponse .= $data; + } + return \strlen($data); + }); + } else { + \curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + } + \curl_setopt($ch, CURLOPT_TIMEOUT, (int) $timeout + 5); // Gives extra 5s after safe timeout to recieve response \curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5); $headers['x-open-runtimes-logging'] = $logging ? 'enabled' : 'disabled'; @@ -816,7 +841,14 @@ public function createExecution( \curl_setopt($ch, CURLOPT_HEADEROPT, CURLHEADER_UNIFIED); \curl_setopt($ch, CURLOPT_HTTPHEADER, $headersArr); - $executorResponse = \curl_exec($ch); + // When CURLOPT_WRITEFUNCTION is set, curl_exec returns true (not the body). + // $executorResponse is either accumulated in the WRITEFUNCTION callback (non-SSE), + // or empty (SSE path, where body was forwarded via $streamCallback). + if ($streamCallback === null) { + $executorResponse = \curl_exec($ch); + } else { + \curl_exec($ch); + } $statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);