Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
parameters:
level: max
phpVersion: 80400
paths:
- src
excludePaths:
Expand Down
36 changes: 19 additions & 17 deletions src/Handlers/ExceptionHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ final class ExceptionHandler
public static function createFromWorkerError(array $errorData, string $sourceLocation): \Throwable
{
$className = $errorData['class'] ?? \RuntimeException::class;
$message = $errorData['message'] ?? 'Unknown error';
$messageRaw = $errorData['message'] ?? 'Unknown error';
$codeValue = $errorData['code'] ?? 0;
$workerTrace = $errorData['stack_trace'] ?? '';
$workerTraceRaw = $errorData['stack_trace'] ?? '';

assert(\is_string($message));
assert(\is_string($workerTrace));
$message = \is_string($messageRaw) ? $messageRaw : 'Unknown error';
$workerTrace = \is_string($workerTraceRaw) ? $workerTraceRaw : '';

if (! \is_string($className)) {
$className = \RuntimeException::class;
Expand All @@ -36,10 +36,12 @@ public static function createFromWorkerError(array $errorData, string $sourceLoc
$code = self::normalizeExceptionCode($codeValue);
$exception = self::instantiateException($className, $message, $code);

self::setExceptionLocation($exception, $sourceLocation);
$reflection = new \ReflectionObject($exception);

self::setExceptionLocation($exception, $sourceLocation, $reflection);

if ($workerTrace !== '') {
self::appendWorkerStackTrace($exception, $workerTrace);
self::appendWorkerStackTrace($exception, $workerTrace, $reflection);
}

return $exception;
Expand Down Expand Up @@ -110,10 +112,11 @@ private static function instantiateException(string $className, string $message,
*
* @param \Throwable $exception
* @param string $sourceLocation Format: "file:line"
* @param \ReflectionObject $reflection
*
* @return void
*/
private static function setExceptionLocation(\Throwable $exception, string $sourceLocation): void
private static function setExceptionLocation(\Throwable $exception, string $sourceLocation, \ReflectionObject $reflection): void
{
if ($sourceLocation === 'unknown' || ! str_contains($sourceLocation, ':')) {
return;
Expand All @@ -122,17 +125,16 @@ private static function setExceptionLocation(\Throwable $exception, string $sour
try {
[$file, $line] = self::parseSourceLocation($sourceLocation);

$reflection = new \ReflectionObject($exception);
$reflection = self::findReflectionWithProperty($reflection, 'file');
$reflectionClass = self::findReflectionWithProperty($reflection, 'file');

if ($reflection === null) {
if ($reflectionClass === null) {
return;
}

$fileProp = $reflection->getProperty('file');
$fileProp = $reflectionClass->getProperty('file');
$fileProp->setValue($exception, $file);

$lineProp = $reflection->getProperty('line');
$lineProp = $reflectionClass->getProperty('line');
$lineProp->setValue($exception, (int)$line);
} catch (\Throwable) {
// Ignore reflection errors
Expand Down Expand Up @@ -166,20 +168,20 @@ private static function parseSourceLocation(string $sourceLocation): array
*
* @param \Throwable $exception
* @param string $workerTrace
* @param \ReflectionObject $reflection
*
* @return void
*/
private static function appendWorkerStackTrace(\Throwable $exception, string $workerTrace): void
private static function appendWorkerStackTrace(\Throwable $exception, string $workerTrace, \ReflectionObject $reflection): void
{
try {
$reflection = new \ReflectionObject($exception);
$reflection = self::findReflectionWithProperty($reflection, 'trace');
$reflectionClass = self::findReflectionWithProperty($reflection, 'trace');

if ($reflection === null) {
if ($reflectionClass === null) {
return;
}

$traceProp = $reflection->getProperty('trace');
$traceProp = $reflectionClass->getProperty('trace');

$currentTrace = $traceProp->getValue($exception);

Expand Down
2 changes: 1 addition & 1 deletion src/Handlers/ProcessSpawnHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* background processes, setting up their communication channels, and preparing
* task payloads for execution.
*/
class ProcessSpawnHandler
final class ProcessSpawnHandler
{
private string|int $defaultProcessMemoryLimit;

Expand Down
4 changes: 2 additions & 2 deletions src/Internals/BackgroundProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public function isRunning(): bool

if (PHP_OS_FAMILY === 'Windows') {
$output = shell_exec("tasklist /FI \"PID eq {$this->pid}\" 2>nul");
$running = \is_string($output) && strpos($output, (string)$this->pid) !== false;
$running = \is_string($output) && str_contains($output, (string)$this->pid);
} else {
if (\is_resource($this->processResource)) {
$status = proc_get_status($this->processResource);
Expand All @@ -85,7 +85,7 @@ public function isRunning(): bool
$running = posix_kill($this->pid, 0);
} else {
$output = shell_exec("ps -p {$this->pid} 2>/dev/null");
$running = \is_string($output) && strpos($output, (string)$this->pid) !== false;
$running = \is_string($output) && str_contains($output, (string)$this->pid);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Managers/ProcessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* fire-and-forget background tasks. Implements a singleton pattern for global access,
* but allows direct instantiation for testing and dependency injection.
*/
class ProcessManager
final class ProcessManager
{
/**
* Default maximum number of background tasks allowed to spawn per second.
Expand Down
5 changes: 3 additions & 2 deletions src/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Hibla\Parallel;

use Hibla\Parallel\Interfaces\ProcessPoolInterface;
use Hibla\Parallel\Exceptions\PoolShutdownException;
use Hibla\Parallel\Managers\ProcessManager;
use Hibla\Parallel\Managers\ProcessPoolManager;
use Hibla\Parallel\ValueObjects\WorkerMessage;
Expand Down Expand Up @@ -253,12 +254,12 @@ public function boot(): static
*
* @inheritdoc
*
* @return PromiseInterface<TResult>
* @return PromiseInterface<TResult> Rejects with PoolShutdownException if the pool has been shut down.
*/
public function run(callable $callback, ?callable $onMessage = null): PromiseInterface
{
if ($this->isShutdown) {
return Promise::rejected(new \RuntimeException('Cannot submit task to a shutdown pool.'));
return Promise::rejected(new PoolShutdownException('Cannot submit task to a shutdown pool.'));
}

$sourceLocation = 'unknown';
Expand Down
32 changes: 20 additions & 12 deletions src/Utilities/ProcessKiller.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ private static function killTreesUnixMapped(array $pids, array $parentMap, array
{
$killedPgids = [];

$childMap = [];
foreach ($parentMap as $child => $parent) {
$childMap[$parent][] = $child;
}

foreach ($pids as $pid) {
$pgid = $pgidMap[$pid] ?? null;

Expand All @@ -119,7 +124,7 @@ private static function killTreesUnixMapped(array $pids, array $parentMap, array
self::sendSignalToGroup($pgid, SIGKILL);
}
} else {
$descendants = self::collectDescendants($pid, $parentMap);
$descendants = self::collectDescendants($pid, $childMap);
foreach (array_reverse($descendants) as $descendantPid) {
self::sendSignal($descendantPid, SIGKILL);
}
Expand Down Expand Up @@ -219,7 +224,7 @@ private static function buildProcMaps(): array
restore_error_handler();
}

return[$parentMap, $pgidMap];
return [$parentMap, $pgidMap];
}

/**
Expand Down Expand Up @@ -249,17 +254,12 @@ private static function parseProcStat(string $stat): ?array

/**
* @param int $pid
* @param array<int, int> $parentMap
* @param array<int, list<int>> $childMap Pre-built child map (pid => list of child pids)
*
* @return list<int>
*/
private static function collectDescendants(int $pid, array $parentMap): array
private static function collectDescendants(int $pid, array $childMap): array
{
$childMap = [];
foreach ($parentMap as $child => $parent) {
$childMap[$parent][] = $child;
}

$result = [];
$queue = [$pid];

Expand All @@ -275,24 +275,32 @@ private static function collectDescendants(int $pid, array $parentMap): array
}

/**
* Fallback strategy using pgrep for recursive discovery.
* Fallback strategy using pgrep for iterative discovery.
*
* @param list<int> $pids
*/
private static function killTreesUnixFallback(array $pids): void
{
foreach ($pids as $pid) {
$queue = $pids;
$toKill = [];

while ($queue !== []) {
$pid = array_shift($queue);
$toKill[] = $pid;

$output = @shell_exec("pgrep -P {$pid} 2>/dev/null");

if (\is_string($output) && $output !== '') {
foreach (explode("\n", trim($output)) as $childPid) {
$childPid = trim($childPid);
if (ctype_digit($childPid) && (int) $childPid > 0) {
self::killTreesUnixFallback([(int) $childPid]);
$queue[] = (int) $childPid;
}
}
}
}

foreach (array_reverse($toKill) as $pid) {
self::sendSignal($pid, SIGKILL);
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/Utilities/SystemUtilities.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ public static function getCpuCount(): int
}

if (is_readable('/proc/cpuinfo')) {
$count = substr_count((string) file_get_contents('/proc/cpuinfo'), "\nprocessor");
if ($count > 0) {
return self::$cpuCount = $count + 1;
$cpuInfo = (string) file_get_contents('/proc/cpuinfo');
$matchCount = preg_match_all('/^processor/m', $cpuInfo, $cpuMatches);
if ($matchCount !== false && $matchCount > 0) {
return self::$cpuCount = $matchCount;
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ function spawnFn(callable $task, ?int $timeout = null): callable
* Sends a structured MESSAGE frame to the parent process via stdout,
* bypassing the output buffer so it is never captured as task output.
*
* Supports any serializable PHP value — scalars, arrays, and objects
* all round-trip correctly across the process boundary. Objects are
* transparently serialized using base64(serialize()) and reconstructed
* into their original type on the parent side when building WorkerMessage.
* Supports scalars, arrays of scalars, and objects that implement JsonSerializable.
* Arrays containing objects that do not implement JsonSerializable are serialized
* using base64(serialize()) so they round-trip correctly across the process boundary.
* Standalone objects are also transparently serialized and reconstructed into their
* original type on the parent side when building WorkerMessage.
*
* Silently no-ops when called outside a worker context (e.g., in the
* parent process or in a fire-and-forget background worker where stdout
Expand All @@ -226,6 +227,14 @@ function emit(mixed $data): void

$needsSerialization = \is_object($data) || \is_resource($data);

if (! $needsSerialization && \is_array($data)) {
array_walk_recursive($data, static function (mixed $value) use (&$needsSerialization): void {
if (\is_object($value) || \is_resource($value)) {
$needsSerialization = true;
}
});
}

$frame = [
'status' => 'MESSAGE',
'pid' => getmypid(),
Expand Down
Loading