From 0a98641b50f7d191387577f7bd2207040979752e Mon Sep 17 00:00:00 2001 From: Jeffrey Bulanadi <41933086+jeffreybulanadi@users.noreply.github.com> Date: Tue, 5 May 2026 14:23:23 +0800 Subject: [PATCH] Various improvements across process management utilities --- phpstan.neon | 1 + src/Handlers/ExceptionHandler.php | 36 +++++++++++++++------------- src/Handlers/ProcessSpawnHandler.php | 2 +- src/Internals/BackgroundProcess.php | 4 ++-- src/Managers/ProcessManager.php | 2 +- src/ProcessPool.php | 5 ++-- src/Utilities/ProcessKiller.php | 32 +++++++++++++++---------- src/Utilities/SystemUtilities.php | 7 +++--- src/functions.php | 17 +++++++++---- 9 files changed, 64 insertions(+), 42 deletions(-) diff --git a/phpstan.neon b/phpstan.neon index c3086f1..742685f 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,5 +1,6 @@ parameters: level: max + phpVersion: 80400 paths: - src excludePaths: diff --git a/src/Handlers/ExceptionHandler.php b/src/Handlers/ExceptionHandler.php index 7c417d4..6fed5c7 100644 --- a/src/Handlers/ExceptionHandler.php +++ b/src/Handlers/ExceptionHandler.php @@ -22,12 +22,12 @@ final class ExceptionHandler public static function createFromWorkerError(array $errorData, string $sourceLocation): \Throwable { $className = $errorData['class'] ?? \RuntimeException::class; - $message = $errorData['message'] ?? 'Unknown error'; + $messageRaw = $errorData['message'] ?? 'Unknown error'; $codeValue = $errorData['code'] ?? 0; - $workerTrace = $errorData['stack_trace'] ?? ''; + $workerTraceRaw = $errorData['stack_trace'] ?? ''; - assert(\is_string($message)); - assert(\is_string($workerTrace)); + $message = \is_string($messageRaw) ? $messageRaw : 'Unknown error'; + $workerTrace = \is_string($workerTraceRaw) ? $workerTraceRaw : ''; if (! \is_string($className)) { $className = \RuntimeException::class; @@ -36,10 +36,12 @@ public static function createFromWorkerError(array $errorData, string $sourceLoc $code = self::normalizeExceptionCode($codeValue); $exception = self::instantiateException($className, $message, $code); - self::setExceptionLocation($exception, $sourceLocation); + $reflection = new \ReflectionObject($exception); + + self::setExceptionLocation($exception, $sourceLocation, $reflection); if ($workerTrace !== '') { - self::appendWorkerStackTrace($exception, $workerTrace); + self::appendWorkerStackTrace($exception, $workerTrace, $reflection); } return $exception; @@ -110,10 +112,11 @@ private static function instantiateException(string $className, string $message, * * @param \Throwable $exception * @param string $sourceLocation Format: "file:line" + * @param \ReflectionObject $reflection * * @return void */ - private static function setExceptionLocation(\Throwable $exception, string $sourceLocation): void + private static function setExceptionLocation(\Throwable $exception, string $sourceLocation, \ReflectionObject $reflection): void { if ($sourceLocation === 'unknown' || ! str_contains($sourceLocation, ':')) { return; @@ -122,17 +125,16 @@ private static function setExceptionLocation(\Throwable $exception, string $sour try { [$file, $line] = self::parseSourceLocation($sourceLocation); - $reflection = new \ReflectionObject($exception); - $reflection = self::findReflectionWithProperty($reflection, 'file'); + $reflectionClass = self::findReflectionWithProperty($reflection, 'file'); - if ($reflection === null) { + if ($reflectionClass === null) { return; } - $fileProp = $reflection->getProperty('file'); + $fileProp = $reflectionClass->getProperty('file'); $fileProp->setValue($exception, $file); - $lineProp = $reflection->getProperty('line'); + $lineProp = $reflectionClass->getProperty('line'); $lineProp->setValue($exception, (int)$line); } catch (\Throwable) { // Ignore reflection errors @@ -166,20 +168,20 @@ private static function parseSourceLocation(string $sourceLocation): array * * @param \Throwable $exception * @param string $workerTrace + * @param \ReflectionObject $reflection * * @return void */ - private static function appendWorkerStackTrace(\Throwable $exception, string $workerTrace): void + private static function appendWorkerStackTrace(\Throwable $exception, string $workerTrace, \ReflectionObject $reflection): void { try { - $reflection = new \ReflectionObject($exception); - $reflection = self::findReflectionWithProperty($reflection, 'trace'); + $reflectionClass = self::findReflectionWithProperty($reflection, 'trace'); - if ($reflection === null) { + if ($reflectionClass === null) { return; } - $traceProp = $reflection->getProperty('trace'); + $traceProp = $reflectionClass->getProperty('trace'); $currentTrace = $traceProp->getValue($exception); diff --git a/src/Handlers/ProcessSpawnHandler.php b/src/Handlers/ProcessSpawnHandler.php index 57e54e8..5ebc766 100644 --- a/src/Handlers/ProcessSpawnHandler.php +++ b/src/Handlers/ProcessSpawnHandler.php @@ -24,7 +24,7 @@ * background processes, setting up their communication channels, and preparing * task payloads for execution. */ -class ProcessSpawnHandler +final class ProcessSpawnHandler { private string|int $defaultProcessMemoryLimit; diff --git a/src/Internals/BackgroundProcess.php b/src/Internals/BackgroundProcess.php index 9039931..6789823 100644 --- a/src/Internals/BackgroundProcess.php +++ b/src/Internals/BackgroundProcess.php @@ -76,7 +76,7 @@ public function isRunning(): bool if (PHP_OS_FAMILY === 'Windows') { $output = shell_exec("tasklist /FI \"PID eq {$this->pid}\" 2>nul"); - $running = \is_string($output) && strpos($output, (string)$this->pid) !== false; + $running = \is_string($output) && str_contains($output, (string)$this->pid); } else { if (\is_resource($this->processResource)) { $status = proc_get_status($this->processResource); @@ -85,7 +85,7 @@ public function isRunning(): bool $running = posix_kill($this->pid, 0); } else { $output = shell_exec("ps -p {$this->pid} 2>/dev/null"); - $running = \is_string($output) && strpos($output, (string)$this->pid) !== false; + $running = \is_string($output) && str_contains($output, (string)$this->pid); } } diff --git a/src/Managers/ProcessManager.php b/src/Managers/ProcessManager.php index a74c34e..aaba321 100644 --- a/src/Managers/ProcessManager.php +++ b/src/Managers/ProcessManager.php @@ -24,7 +24,7 @@ * fire-and-forget background tasks. Implements a singleton pattern for global access, * but allows direct instantiation for testing and dependency injection. */ -class ProcessManager +final class ProcessManager { /** * Default maximum number of background tasks allowed to spawn per second. diff --git a/src/ProcessPool.php b/src/ProcessPool.php index a5c7af6..6f0c803 100644 --- a/src/ProcessPool.php +++ b/src/ProcessPool.php @@ -5,6 +5,7 @@ namespace Hibla\Parallel; use Hibla\Parallel\Interfaces\ProcessPoolInterface; +use Hibla\Parallel\Exceptions\PoolShutdownException; use Hibla\Parallel\Managers\ProcessManager; use Hibla\Parallel\Managers\ProcessPoolManager; use Hibla\Parallel\ValueObjects\WorkerMessage; @@ -253,12 +254,12 @@ public function boot(): static * * @inheritdoc * - * @return PromiseInterface + * @return PromiseInterface Rejects with PoolShutdownException if the pool has been shut down. */ public function run(callable $callback, ?callable $onMessage = null): PromiseInterface { if ($this->isShutdown) { - return Promise::rejected(new \RuntimeException('Cannot submit task to a shutdown pool.')); + return Promise::rejected(new PoolShutdownException('Cannot submit task to a shutdown pool.')); } $sourceLocation = 'unknown'; diff --git a/src/Utilities/ProcessKiller.php b/src/Utilities/ProcessKiller.php index aea5f27..14005ca 100644 --- a/src/Utilities/ProcessKiller.php +++ b/src/Utilities/ProcessKiller.php @@ -110,6 +110,11 @@ private static function killTreesUnixMapped(array $pids, array $parentMap, array { $killedPgids = []; + $childMap = []; + foreach ($parentMap as $child => $parent) { + $childMap[$parent][] = $child; + } + foreach ($pids as $pid) { $pgid = $pgidMap[$pid] ?? null; @@ -119,7 +124,7 @@ private static function killTreesUnixMapped(array $pids, array $parentMap, array self::sendSignalToGroup($pgid, SIGKILL); } } else { - $descendants = self::collectDescendants($pid, $parentMap); + $descendants = self::collectDescendants($pid, $childMap); foreach (array_reverse($descendants) as $descendantPid) { self::sendSignal($descendantPid, SIGKILL); } @@ -219,7 +224,7 @@ private static function buildProcMaps(): array restore_error_handler(); } - return[$parentMap, $pgidMap]; + return [$parentMap, $pgidMap]; } /** @@ -249,17 +254,12 @@ private static function parseProcStat(string $stat): ?array /** * @param int $pid - * @param array $parentMap + * @param array> $childMap Pre-built child map (pid => list of child pids) * * @return list */ - private static function collectDescendants(int $pid, array $parentMap): array + private static function collectDescendants(int $pid, array $childMap): array { - $childMap = []; - foreach ($parentMap as $child => $parent) { - $childMap[$parent][] = $child; - } - $result = []; $queue = [$pid]; @@ -275,24 +275,32 @@ private static function collectDescendants(int $pid, array $parentMap): array } /** - * Fallback strategy using pgrep for recursive discovery. + * Fallback strategy using pgrep for iterative discovery. * * @param list $pids */ private static function killTreesUnixFallback(array $pids): void { - foreach ($pids as $pid) { + $queue = $pids; + $toKill = []; + + while ($queue !== []) { + $pid = array_shift($queue); + $toKill[] = $pid; + $output = @shell_exec("pgrep -P {$pid} 2>/dev/null"); if (\is_string($output) && $output !== '') { foreach (explode("\n", trim($output)) as $childPid) { $childPid = trim($childPid); if (ctype_digit($childPid) && (int) $childPid > 0) { - self::killTreesUnixFallback([(int) $childPid]); + $queue[] = (int) $childPid; } } } + } + foreach (array_reverse($toKill) as $pid) { self::sendSignal($pid, SIGKILL); } } diff --git a/src/Utilities/SystemUtilities.php b/src/Utilities/SystemUtilities.php index 6fff1df..44c7f71 100644 --- a/src/Utilities/SystemUtilities.php +++ b/src/Utilities/SystemUtilities.php @@ -186,9 +186,10 @@ public static function getCpuCount(): int } if (is_readable('/proc/cpuinfo')) { - $count = substr_count((string) file_get_contents('/proc/cpuinfo'), "\nprocessor"); - if ($count > 0) { - return self::$cpuCount = $count + 1; + $cpuInfo = (string) file_get_contents('/proc/cpuinfo'); + $matchCount = preg_match_all('/^processor/m', $cpuInfo, $cpuMatches); + if ($matchCount !== false && $matchCount > 0) { + return self::$cpuCount = $matchCount; } } } diff --git a/src/functions.php b/src/functions.php index 5e58e24..629e953 100644 --- a/src/functions.php +++ b/src/functions.php @@ -203,10 +203,11 @@ function spawnFn(callable $task, ?int $timeout = null): callable * Sends a structured MESSAGE frame to the parent process via stdout, * bypassing the output buffer so it is never captured as task output. * - * Supports any serializable PHP value — scalars, arrays, and objects - * all round-trip correctly across the process boundary. Objects are - * transparently serialized using base64(serialize()) and reconstructed - * into their original type on the parent side when building WorkerMessage. + * Supports scalars, arrays of scalars, and objects that implement JsonSerializable. + * Arrays containing objects that do not implement JsonSerializable are serialized + * using base64(serialize()) so they round-trip correctly across the process boundary. + * Standalone objects are also transparently serialized and reconstructed into their + * original type on the parent side when building WorkerMessage. * * Silently no-ops when called outside a worker context (e.g., in the * parent process or in a fire-and-forget background worker where stdout @@ -226,6 +227,14 @@ function emit(mixed $data): void $needsSerialization = \is_object($data) || \is_resource($data); + if (! $needsSerialization && \is_array($data)) { + array_walk_recursive($data, static function (mixed $value) use (&$needsSerialization): void { + if (\is_object($value) || \is_resource($value)) { + $needsSerialization = true; + } + }); + } + $frame = [ 'status' => 'MESSAGE', 'pid' => getmypid(),