diff --git a/src/KubernetesCluster.php b/src/KubernetesCluster.php index 90ddf6e..3a8a625 100644 --- a/src/KubernetesCluster.php +++ b/src/KubernetesCluster.php @@ -132,6 +132,7 @@ class KubernetesCluster use Traits\Cluster\ChecksClusterVersion; use Traits\Cluster\LoadsFromKubeConfig; use Traits\Cluster\MakesHttpCalls; + use Traits\Cluster\MakesWatchCalls; use Traits\Cluster\MakesWebsocketCalls; /** @@ -176,8 +177,8 @@ public function runOperation(Operation|string $operation, string $path, string|n } return match ($operation) { - Operation::WATCH => $this->watchPath($path, $payload, $query), - Operation::WATCH_LOGS => $this->watchLogsPath($path, $payload, $query), + Operation::WATCH => $this->runWatchOperation($this->watchPath($path, $payload, $query)), + Operation::WATCH_LOGS => $this->runWatchOperation($this->watchLogsPath($path, $payload, $query)), Operation::EXEC => $this->execPath($path, $query), Operation::ATTACH => $this->attachPath($path, $payload, $query), Operation::APPLY => $this->applyPath($path, $payload, $query), @@ -188,156 +189,22 @@ public function runOperation(Operation|string $operation, string $path, string|n } /** - * Watch for the current resource or a resource list. + * Run a watch operation and return the result. * - * @return mixed|null + * @param array{0: \React\EventLoop\LoopInterface, 1: \React\Promise\PromiseInterface} $loopAndPromise */ - protected function watchPath(string $path, Closure $callback, array $query = ['pretty' => 1]): mixed + protected function runWatchOperation(array $loopAndPromise): mixed { - $resourceClass = $this->resourceClass; - $sock = $this->createSocketConnection($this->getCallableUrl($path, $query)); + [$loop, $promise] = $loopAndPromise; - if ($sock === false) { - return null; - } - - // Set stream to non-blocking mode to allow timeout handling - stream_set_blocking($sock, false); - - // Calculate overall timeout: server timeout + buffer for network/processing - $timeout = ($query['timeoutSeconds'] ?? 30) + 5; - $endTime = time() + $timeout; - - $buffer = ''; - - while (time() < $endTime) { - // Try to read data (non-blocking) - $chunk = fread($sock, 8192); - - if ($chunk === false) { - // Error occurred - fclose($sock); - - return null; - } - - if ($chunk === '') { - // No data available, check if stream ended - if (feof($sock)) { - break; - } - - // No data yet, sleep briefly and continue - usleep(100000); // 100ms - - continue; - } - - // Append chunk to buffer - $buffer .= $chunk; - - // Process complete lines from buffer - while (($pos = strpos($buffer, "\n")) !== false) { - $line = substr($buffer, 0, $pos); - $buffer = substr($buffer, $pos + 1); - - if (trim($line) === '') { - continue; - } - - $data = @json_decode($line, true); - - if (! $data || ! isset($data['type'], $data['object'])) { - continue; - } - - ['type' => $type, 'object' => $attributes] = $data; - - $call = call_user_func( - $callback, - $type, - new $resourceClass($this, $attributes) - ); - - if (! is_null($call)) { - fclose($sock); - - return $call; - } - } - } - - fclose($sock); - - return null; - } - - /** - * Watch for the logs for the resource. - * - * @return mixed|null - */ - protected function watchLogsPath(string $path, Closure $callback, array $query = ['pretty' => 1]): mixed - { - $sock = $this->createSocketConnection($this->getCallableUrl($path, $query)); - - if ($sock === false) { - return null; - } - - // Set stream to non-blocking mode to allow timeout handling - stream_set_blocking($sock, false); - - // Calculate overall timeout: server timeout + buffer for network/processing - $timeout = ($query['timeoutSeconds'] ?? 30) + 5; - $endTime = time() + $timeout; - - $buffer = ''; - - while (time() < $endTime) { - // Try to read data (non-blocking) - $chunk = fread($sock, 8192); - - if ($chunk === false) { - // Error occurred - fclose($sock); - - return null; - } - - if ($chunk === '') { - // No data available, check if stream ended - if (feof($sock)) { - break; - } - - // No data yet, sleep briefly and continue - usleep(100000); // 100ms - - continue; - } - - // Append chunk to buffer - $buffer .= $chunk; - - // Process complete lines from buffer - while (($pos = strpos($buffer, "\n")) !== false) { - $line = substr($buffer, 0, $pos); - $buffer = substr($buffer, $pos + 1); - - $call = call_user_func($callback, $line."\n"); - - if (! is_null($call)) { - fclose($sock); - - return $call; - } - } - } + $result = null; + $promise->then(function ($value) use (&$result) { + $result = $value; + }); - fclose($sock); + $loop->run(); - return null; + return $result; } /** diff --git a/src/Traits/Cluster/MakesWatchCalls.php b/src/Traits/Cluster/MakesWatchCalls.php new file mode 100644 index 0000000..acf745e --- /dev/null +++ b/src/Traits/Cluster/MakesWatchCalls.php @@ -0,0 +1,319 @@ +run() to execute, or integrate with your own event loop. + * + * @return array{0: LoopInterface, 1: PromiseInterface} + */ + protected function watchPath(string $path, Closure $callback, array $query = ['pretty' => 1]): array + { + /** @var class-string<\RenokiCo\PhpK8s\Kinds\K8sResource> $resourceClass */ + $resourceClass = $this->resourceClass; + $cluster = $this; + + return $this->createAsyncStreamConnection( + $this->getCallableUrl($path, $query), + function (string $line) use ($callback, $resourceClass, $cluster) { + $data = @json_decode($line, true); + + if (! $data || ! isset($data['type'], $data['object'])) { + return null; + } + + ['type' => $type, 'object' => $attributes] = $data; + + return call_user_func( + $callback, + $type, + new $resourceClass($cluster, $attributes) + ); + } + ); + } + + /** + * Watch for the logs for the resource. + * + * Returns the event loop and promise for async operation. + * Call $loop->run() to execute, or integrate with your own event loop. + * + * @return array{0: LoopInterface, 1: PromiseInterface} + */ + protected function watchLogsPath(string $path, Closure $callback, array $query = ['pretty' => 1]): array + { + return $this->createAsyncStreamConnection( + $this->getCallableUrl($path, $query), + function (string $line) use ($callback) { + return call_user_func($callback, $line."\n"); + } + ); + } + + /** + * Create an async streaming HTTP connection using ReactPHP. + * + * @param Closure $lineCallback Called for each line received, return non-null to stop. + * @return array{0: LoopInterface, 1: PromiseInterface} + */ + protected function createAsyncStreamConnection(string $url, Closure $lineCallback): array + { + $parsed = parse_url($url); + $scheme = $parsed['scheme'] ?? 'http'; + $host = $parsed['host'] ?? 'localhost'; + $port = $parsed['port'] ?? ($scheme === 'https' ? 443 : 80); + $requestPath = ($parsed['path'] ?? '/'). + (isset($parsed['query']) ? '?'.$parsed['query'] : ''); + + $loop = Loop::get(); + $deferred = new Deferred; + + $connectorOptions = [ + 'timeout' => $this->timeout ?? 30.0, + 'tls' => $this->buildReactTlsOptions(), + ]; + + $connector = new ReactSocketConnector($connectorOptions, $loop); + + $uri = ($scheme === 'https' ? 'tls' : 'tcp')."://{$host}:{$port}"; + + $connector->connect($uri)->then( + function (ConnectionInterface $connection) use ($requestPath, $host, $port, $lineCallback, $deferred) { + // Build and send HTTP request + $request = $this->buildHttpRequest($requestPath, $host, $port); + $connection->write($request); + + $buffer = ''; + $headersParsed = false; + $isChunked = false; + $chunkBuffer = ''; + $expectedChunkSize = null; + + $connection->on('data', function ($chunk) use ( + &$buffer, + &$headersParsed, + &$isChunked, + &$chunkBuffer, + &$expectedChunkSize, + $lineCallback, + $connection, + $deferred + ) { + $buffer .= $chunk; + + // Parse HTTP headers first + if (! $headersParsed) { + $headerEnd = strpos($buffer, "\r\n\r\n"); + if ($headerEnd === false) { + return; // Wait for complete headers + } + + $headers = substr($buffer, 0, $headerEnd); + $buffer = substr($buffer, $headerEnd + 4); + $headersParsed = true; + + // Check for chunked transfer encoding + if (stripos($headers, 'transfer-encoding: chunked') !== false) { + $isChunked = true; + } + } + + // Process body content + if ($isChunked) { + $this->processChunkedData( + $buffer, + $chunkBuffer, + $expectedChunkSize, + $lineCallback, + $connection, + $deferred + ); + } else { + $this->processStreamData( + $buffer, + $lineCallback, + $connection, + $deferred + ); + } + }); + + $connection->on('close', function () use ($deferred) { + $deferred->resolve(null); + }); + + $connection->on('error', function (Exception $e) use ($deferred) { + $deferred->reject($e); + }); + }, + function (Exception $e) use ($deferred) { + $deferred->reject($e); + } + ); + + return [$loop, $deferred->promise()]; + } + + /** + * Build HTTP request string with authentication headers. + */ + protected function buildHttpRequest(string $path, string $host, int $port): string + { + $hostHeader = $port === 80 || $port === 443 ? $host : "{$host}:{$port}"; + + $headers = [ + "GET {$path} HTTP/1.1", + "Host: {$hostHeader}", + 'Accept: application/json', + 'Connection: keep-alive', + ]; + + // Add authentication + $authToken = $this->getAuthToken(); + if ($authToken) { + $headers[] = "Authorization: Bearer {$authToken}"; + } elseif ($this->auth) { + $headers[] = 'Authorization: Basic '.base64_encode(implode(':', $this->auth)); + } + + return implode("\r\n", $headers)."\r\n\r\n"; + } + + /** + * Build TLS options for React socket connector. + */ + protected function buildReactTlsOptions(): array + { + $tlsOptions = []; + + if (is_bool($this->verify)) { + $tlsOptions['verify_peer'] = $this->verify; + $tlsOptions['verify_peer_name'] = $this->verify; + } elseif (is_string($this->verify)) { + $tlsOptions['cafile'] = $this->verify; + } + + if ($this->cert) { + $tlsOptions['local_cert'] = $this->cert; + } + + if ($this->sslKey) { + $tlsOptions['local_pk'] = $this->sslKey; + } + + return $tlsOptions; + } + + /** + * Process non-chunked streaming data. + */ + protected function processStreamData( + string &$buffer, + Closure $lineCallback, + ConnectionInterface $connection, + Deferred $deferred + ): void { + while (($pos = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + + $line = trim($line); + if ($line === '') { + continue; + } + + $result = $lineCallback($line); + if ($result !== null) { + $deferred->resolve($result); + $connection->close(); + + return; + } + } + } + + /** + * Process chunked transfer encoding data. + */ + protected function processChunkedData( + string &$buffer, + string &$chunkBuffer, + ?int &$expectedChunkSize, + Closure $lineCallback, + ConnectionInterface $connection, + Deferred $deferred + ): void { + while (true) { + // If we don't have a chunk size, try to read one + if ($expectedChunkSize === null) { + $sizeEnd = strpos($buffer, "\r\n"); + if ($sizeEnd === false) { + return; // Wait for chunk size line + } + + $sizeHex = substr($buffer, 0, $sizeEnd); + $buffer = substr($buffer, $sizeEnd + 2); + $expectedChunkSize = hexdec($sizeHex); + + // Zero size means end of stream + if ($expectedChunkSize === 0) { + $connection->close(); + $deferred->resolve(null); + + return; + } + } + + // Read chunk data + if (strlen($buffer) < $expectedChunkSize + 2) { + return; // Wait for complete chunk + CRLF + } + + $chunkData = substr($buffer, 0, $expectedChunkSize); + $buffer = substr($buffer, $expectedChunkSize + 2); // Skip CRLF after chunk + $expectedChunkSize = null; + + // Add to line buffer and process complete lines + $chunkBuffer .= $chunkData; + + while (($pos = strpos($chunkBuffer, "\n")) !== false) { + $line = substr($chunkBuffer, 0, $pos); + $chunkBuffer = substr($chunkBuffer, $pos + 1); + + $line = trim($line); + if ($line === '') { + continue; + } + + $result = $lineCallback($line); + if ($result !== null) { + $deferred->resolve($result); + $connection->close(); + + return; + } + } + } + } +} diff --git a/src/Traits/Cluster/MakesWebsocketCalls.php b/src/Traits/Cluster/MakesWebsocketCalls.php index 5ff0a48..8ba8738 100644 --- a/src/Traits/Cluster/MakesWebsocketCalls.php +++ b/src/Traits/Cluster/MakesWebsocketCalls.php @@ -6,7 +6,7 @@ use Exception; use Illuminate\Support\Str; use Ratchet\Client\Connector as WebSocketConnector; -use React\EventLoop\Factory as ReactFactory; +use React\EventLoop\Loop; use React\Socket\Connector as ReactSocketConnector; trait MakesWebsocketCalls @@ -63,7 +63,7 @@ public function getWsClient(string $url): array $options['tls']['local_pk'] = $this->sslKey; } - $loop = ReactFactory::create(); + $loop = Loop::get(); $socketConnector = new ReactSocketConnector($options, $loop); $wsConnector = new WebSocketConnector($loop, $socketConnector); @@ -159,9 +159,13 @@ protected function makeWsRequest(string $path, ?Closure $callback = null, array $messages = []; + $error = null; + if ($callback) { $ws->then(function ($connection) use ($callback) { $callback($connection); + }, function (Exception $e) use (&$error) { + $error = $e; }); } else { $ws->then(function ($connection) use (&$externalConnection, &$messages) { @@ -178,8 +182,8 @@ protected function makeWsRequest(string $path, ?Closure $callback = null, array 'output' => $message, ]; }); - }, function (Exception $e) { - throw $e; + }, function (Exception $e) use (&$error) { + $error = $e; }); } @@ -189,6 +193,11 @@ protected function makeWsRequest(string $path, ?Closure $callback = null, array */ $loop->run(); + // Re-throw any connection error that occurred. + if ($error) { + throw $error; + } + // Make sure to close the WS connection. if ($externalConnection) { $externalConnection->close();