Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions app/controllers.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
->param('runtimeEntrypoint', '', new Text(1024, 0), 'Commands to run when creating a container. Maximum of 100 commands are allowed, each 1024 characters long.', true)
->param('logging', true, new Boolean(true), 'Whether executions will be logged.', true)
->param('restartPolicy', DockerAPI::RESTART_NO, new WhiteList([DockerAPI::RESTART_NO, DockerAPI::RESTART_ALWAYS, DockerAPI::RESTART_ON_FAILURE, DockerAPI::RESTART_UNLESS_STOPPED], true), 'Define restart policy once exit code is returned by command. Default value is "no". Possible values are "no", "always", "on-failure", "unless-stopped".', true)
->param('stream', false, new Boolean(true), 'Whether to stream the response. If the runtime responds with content-type: text/event-stream, chunks are forwarded in real-time.', true)
->inject('response')
->inject('request')
->inject('runner')
Expand All @@ -180,6 +181,7 @@ function (
string $runtimeEntrypoint,
bool $logging,
string $restartPolicy,
bool $stream,
Response $response,
Request $request,
Runner $runner
Expand Down Expand Up @@ -214,6 +216,32 @@ function (

$variables = array_map(strval(...), $variables);

// Streaming path: if $stream=true, pass a callback and pipe SSE chunks directly
$streamingDetected = false;
$streamCallback = null;

if ($stream) {
$streamCallback = function (?string $data, ?array $runtimeHeaders) use ($response, &$streamingDetected): void {
if ($runtimeHeaders !== null) {
// Headers signal — fired once before any body chunk (only when SSE detected).
// This is the only opportunity to set response headers before body starts.
$streamingDetected = true;
$statusCode = \intval($runtimeHeaders['x-open-runtimes-status-code'] ?? 200);
$response->setStatusCode($statusCode);
foreach ($runtimeHeaders as $key => $value) {
if (\str_starts_with($key, 'x-open-runtimes-')) {
continue;
}
$response->addHeader($key, \is_array($value) ? \implode(', ', $value) : $value);
}
$response->addHeader('x-open-runtimes-streaming', 'true');
}
if ($data !== null) {
$response->write($data);
}
};
}

$execution = $runner->createExecution(
$runtimeId,
$payload,
Expand All @@ -231,8 +259,15 @@ function (
$runtimeEntrypoint,
$logging,
$restartPolicy,
streamCallback: $streamCallback,
);

// If streaming was detected (SSE), body was already forwarded — close and return.
if ($streamingDetected) {
$response->end();
return;
}

// Backwards compatibility for headers
$responseFormat = $request->getHeader('x-executor-response-format', '0.10.0'); // Last version without support for array value for headers
if (version_compare($responseFormat, '0.11.0', '<')) {
Expand Down
40 changes: 36 additions & 4 deletions src/Executor/Runner/Docker.php
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ public function createExecution(
bool $logging,
string $restartPolicy,
string $region = '',
?callable $streamCallback = null,
): mixed {
$runtimeName = System::getHostname() . '-' . $runtimeId;

Expand Down Expand Up @@ -751,14 +752,15 @@ public function createExecution(
];
};

$executeV5 = function () use ($path, $method, $headers, $payload, $secret, $hostname, $timeout, $runtimeName, $logging): array {
$executeV5 = function () use ($path, $method, $headers, $payload, $secret, $hostname, $timeout, $runtimeName, $logging, $streamCallback): array {
$statusCode = 0;
$errNo = -1;
$executorResponse = '';

$ch = \curl_init();

$responseHeaders = [];
$isStreaming = false;

if (!(\str_starts_with($path, '/'))) {
$path = '/' . $path;
Expand All @@ -772,8 +774,7 @@ public function createExecution(
\curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
}

\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
\curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders): int {
\curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders, &$isStreaming): int {
$len = strlen($header);
$header = explode(':', $header, 2);
if (count($header) < 2) { // ignore invalid headers
Expand All @@ -787,6 +788,10 @@ public function createExecution(
$value = \urldecode($value);
}

if ($key === 'content-type' && \str_contains(\strtolower($value), 'text/event-stream')) {
$isStreaming = true;
}

if (\array_key_exists($key, $responseHeaders)) {
if (is_array($responseHeaders[$key])) {
$responseHeaders[$key][] = $value;
Expand All @@ -800,6 +805,26 @@ public function createExecution(
return $len;
});

if ($streamCallback !== null) {
$headersFired = false;
\curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($curl, $data) use ($streamCallback, &$headersFired, &$responseHeaders, &$isStreaming, &$executorResponse): int {
if ($isStreaming) {
// SSE path: forward chunks to caller
if (!$headersFired) {
$streamCallback(null, $responseHeaders); // headers signal (fired once)
$headersFired = true;
}
$streamCallback($data, null);
} else {
// Non-SSE fallback: accumulate body like CURLOPT_RETURNTRANSFER
$executorResponse .= $data;
}
return \strlen($data);
});
} else {
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
}

\curl_setopt($ch, CURLOPT_TIMEOUT, (int) $timeout + 5); // Gives extra 5s after safe timeout to recieve response
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
$headers['x-open-runtimes-logging'] = $logging ? 'enabled' : 'disabled';
Expand All @@ -816,7 +841,14 @@ public function createExecution(
\curl_setopt($ch, CURLOPT_HEADEROPT, CURLHEADER_UNIFIED);
\curl_setopt($ch, CURLOPT_HTTPHEADER, $headersArr);

$executorResponse = \curl_exec($ch);
// When CURLOPT_WRITEFUNCTION is set, curl_exec returns true (not the body).
// $executorResponse is either accumulated in the WRITEFUNCTION callback (non-SSE),
// or empty (SSE path, where body was forwarded via $streamCallback).
if ($streamCallback === null) {
$executorResponse = \curl_exec($ch);
} else {
\curl_exec($ch);
}

$statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);

Expand Down
Loading