From 13db14fd854e1317e2864777d6401f0073f796b9 Mon Sep 17 00:00:00 2001 From: przepompownia Date: Tue, 30 Dec 2025 00:14:44 +0100 Subject: [PATCH 1/4] Switch to amphp/amp v3 --- .github/workflows/ci.yml | 18 +- .gitignore | 1 + README.md | 13 +- bin/watch | 36 +-- composer.json | 8 +- src/SystemDetector/CommandDetector.php | 27 +- src/Watcher.php | 13 +- .../BufferedWatcher/BufferedWatcher.php | 10 +- .../BufferedWatcherProcess.php | 35 +- src/Watcher/Fallback/FallbackWatcher.php | 51 ++- src/Watcher/Find/FindWatcher.php | 183 +++++------ src/Watcher/FsWatch/FsWatchWatcher.php | 93 +++--- src/Watcher/Inotify/InotifyWatcher.php | 181 +++++------ src/Watcher/Null/NullWatcher.php | 21 +- .../PatternMatchingWatcher.php | 12 +- .../PatternMatching/PatternWatcherProcess.php | 30 +- src/Watcher/PhpPollWatcher/PhpPollWatcher.php | 135 ++++---- src/Watcher/TestWatcher/TestWatcher.php | 40 ++- src/Watcher/Watchman/WatchmanWatcher.php | 306 +++++++++--------- src/WatcherProcess.php | 7 +- .../BufferedWatcher/BufferedWatcherTest.php | 35 +- .../Watcher/Fallback/FallbackWatcherTest.php | 27 +- tests/Watcher/Find/FindWatcherTest.php | 16 +- tests/Watcher/FsWatch/FsWatchWatcherTest.php | 14 +- tests/Watcher/Inotify/InotifyWatcherTest.php | 41 +-- .../PatternMatchingWatcherTest.php | 18 +- .../PhpPollWatcher/PhpPollWatcherTest.php | 9 +- tests/Watcher/WatcherTestCase.php | 79 +++-- .../Watcher/Watchman/WatchmanWatcherTest.php | 14 +- 29 files changed, 678 insertions(+), 795 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 385fef6..cdb73d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: strategy: matrix: php-version: - - '8.1' + - '8.2' steps: - @@ -52,7 +52,7 @@ jobs: strategy: matrix: php-version: - - '8.1' + - '8.2' steps: - @@ -85,8 +85,10 @@ jobs: strategy: matrix: php-version: - - '8.0' - - '8.1' + - '8.2' + - '8.3' + - '8.4' + - '8.5' steps: - @@ -109,11 +111,11 @@ jobs: - name: "Install Watchman" run: | - wget https://github.com/facebook/watchman/releases/download/v2020.09.14.00/watchman-v2020.09.14.00-linux.zip - unzip watchman-v2020.09.14.00-linux.zip + wget https://github.com/facebook/watchman/releases/download/v2025.12.22.00/watchman-v2025.12.22.00-linux.zip + unzip watchman-v2025.12.22.00-linux.zip sudo mkdir -p /usr/local/{bin,lib} /usr/local/var/run/watchman - sudo mv watchman-v2020.09.14.00-linux/bin/watchman /usr/local/bin/watchman - sudo mv watchman-v2020.09.14.00-linux/lib/* /usr/local/lib/ + sudo mv watchman-v2025.12.22.00-linux/bin/watchman /usr/local/bin/watchman + sudo mv watchman-v2025.12.22.00-linux/lib/* /usr/local/lib/ sudo chmod 755 /usr/local/bin/watchman sudo chmod 2777 /usr/local/var/run/watchman - diff --git a/.gitignore b/.gitignore index 505ac83..337a0e1 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ /.vscode .php_cs.cache .php-cs-fixer.cache +/.phpactor.json diff --git a/README.md b/README.md index 83ad72d..c5062c8 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,11 @@ Amp FS Watch ![CI](https://github.com/phpactor/amp-fswatch/workflows/CI/badge.svg) This is an [Amp](https://amphp.org/) library for asynchronously monitor paths -on your file system changes using various stategues. +on your file system changes using various stategies. It's been created to trigger code indexing in [Phpactor](https://github.com/phpactor/phpactor). -- Promise based API. - Capable of automatically selecting a supported watcher for the current environment. - Provides realtime (e.g. ``inotify``) watchers in addition to polling ones. @@ -39,9 +38,15 @@ Loop::run(function () use () { [] ); - $process = yield $watcher->watch([$path]); + $process = $watcher->watch([$path]); - while (null !== $file = yield $process->wait()) { + if (defined('SIGINT')) { + EventLoop::onSignal(SIGINT, function () use ($process): void { + $process->stop(); + }); + } + + while (null !== $file = $process->wait()) { fwrite(STDOUT, sprintf('[%s] %s (%s)'."\n", date('Y-m-d H:i:s.u'), $file->path(), $file->type())); } }); diff --git a/bin/watch b/bin/watch index df975d1..1f9a581 100755 --- a/bin/watch +++ b/bin/watch @@ -1,33 +1,30 @@ #!/usr/bin/env php watch([$path]); +$process = $watcher->watch(); - while (null !== $file = yield $process->wait()) { - fwrite(STDOUT, sprintf('[%s] %s (%s)'."\n", date('Y-m-d H:i:s.u'), $file->path(), $file->type())); - } +// Signals are not supported on Windows +if (defined('SIGINT')) { + EventLoop::onSignal(SIGINT, function () use ($process): void { + $process->stop(); + }); +} - // Signals are not supported on Windows - if(defined('SIGINT')) { - Loop::onSignal(SIGINT, function () use ($process) { - $process->stop(); - Loop::stop(); - }); - } -}); +while (null !== $file = $process->wait()) { + fwrite(STDOUT, sprintf('[%s] %s (%s)' . "\n", date('Y-m-d H:i:s.u'), $file->path(), $file->type())); +} diff --git a/composer.json b/composer.json index 7ad4875..cd13ca0 100644 --- a/composer.json +++ b/composer.json @@ -10,15 +10,15 @@ } ], "require": { - "php": "^8.0", - "amphp/amp": "^2.4", - "amphp/process": "^1.1", + "php": "^8.2", + "amphp/amp": "^3.1", + "amphp/process": "^2", "psr/log": "^1.1", "webmozart/glob": "^4.4", "symfony/filesystem": "^5.0|^6.0" }, "require-dev": { - "amphp/phpunit-util": "^1.3", + "amphp/phpunit-util": "v3.0.0", "ergebnis/composer-normalize": "^2.0", "friendsofphp/php-cs-fixer": "^3.15", "jangregor/phpstan-prophecy": "^1.0", diff --git a/src/SystemDetector/CommandDetector.php b/src/SystemDetector/CommandDetector.php index 1bd143e..e9277b1 100644 --- a/src/SystemDetector/CommandDetector.php +++ b/src/SystemDetector/CommandDetector.php @@ -3,33 +3,22 @@ namespace Phpactor\AmpFsWatch\SystemDetector; use Amp\Process\Process; -use Amp\Promise; class CommandDetector { - /** - * @return Promise - */ - public function commandExists(string $command): Promise + public function commandExists(string $command): bool { return $this->checkPosixCommand($command); } - /** - * @return Promise - */ - private function checkPosixCommand(string $command): Promise + private function checkPosixCommand(string $command): bool { - return \Amp\call(function () use ($command) { - $process = new Process([ - 'command', - '-v', - $command - ]); + $process = Process::start([ + 'command', + '-v', + $command, + ]); - yield $process->start(); - - return 0 === yield $process->join(); - }); + return 0 === $process->join(); } } diff --git a/src/Watcher.php b/src/Watcher.php index 3b8c16e..e196525 100644 --- a/src/Watcher.php +++ b/src/Watcher.php @@ -2,20 +2,11 @@ namespace Phpactor\AmpFsWatch; -use Amp\Promise; - interface Watcher { - /** - * @return Promise - */ - public function watch(): Promise; - - /** - * @return Promise - */ - public function isSupported(): Promise; + public function watch(): WatcherProcess; + public function isSupported(): bool; public function describe(): string; } diff --git a/src/Watcher/BufferedWatcher/BufferedWatcher.php b/src/Watcher/BufferedWatcher/BufferedWatcher.php index a74db58..202bbe3 100644 --- a/src/Watcher/BufferedWatcher/BufferedWatcher.php +++ b/src/Watcher/BufferedWatcher/BufferedWatcher.php @@ -2,8 +2,8 @@ namespace Phpactor\AmpFsWatch\Watcher\BufferedWatcher; -use Amp\Promise; use Phpactor\AmpFsWatch\Watcher; +use Phpactor\AmpFsWatch\WatcherProcess; class BufferedWatcher implements Watcher { @@ -17,15 +17,13 @@ public function __construct(Watcher $innerWatcher, int $interval) $this->interval = $interval; } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - return new BufferedWatcherProcess(yield $this->innerWatcher->watch(), $this->interval); - }); + return new BufferedWatcherProcess($this->innerWatcher->watch(), $this->interval); } - public function isSupported(): Promise + public function isSupported(): bool { return $this->innerWatcher->isSupported(); } diff --git a/src/Watcher/BufferedWatcher/BufferedWatcherProcess.php b/src/Watcher/BufferedWatcher/BufferedWatcherProcess.php index fb6e8d4..c33c425 100644 --- a/src/Watcher/BufferedWatcher/BufferedWatcherProcess.php +++ b/src/Watcher/BufferedWatcher/BufferedWatcherProcess.php @@ -2,11 +2,11 @@ namespace Phpactor\AmpFsWatch\Watcher\BufferedWatcher; -use Amp\Delayed; -use Amp\Promise; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\WatcherProcess; use Throwable; +use function Amp\async; +use function Amp\delay; class BufferedWatcherProcess implements WatcherProcess { @@ -28,9 +28,9 @@ public function __construct(WatcherProcess $innerProcess, int $interval = 500) $this->innerProcess = $innerProcess; $this->interval = $interval; - \Amp\asyncCall(function () { + async(function (): void { try { - while (null !== $modifiedFile = yield $this->innerProcess->wait()) { + while (null !== $modifiedFile = $this->innerProcess->wait()) { assert($modifiedFile instanceof ModifiedFile); $this->buffer[$modifiedFile->path()] = $modifiedFile; } @@ -47,23 +47,20 @@ public function stop(): void $this->innerProcess->stop(); } - - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while ($this->running || !empty($this->buffer || null !== $this->error)) { - if ($this->error) { - $error = $this->error; - $this->error = null; - throw $error; - } - if ($this->buffer) { - return array_shift($this->buffer); - } - yield new Delayed($this->interval); + while ($this->running || !empty($this->buffer || null !== $this->error)) { + if ($this->error) { + $error = $this->error; + $this->error = null; + throw $error; } + if ($this->buffer) { + return array_shift($this->buffer); + } + delay($this->interval / 1000); + } - return null; - }); + return null; } } diff --git a/src/Watcher/Fallback/FallbackWatcher.php b/src/Watcher/Fallback/FallbackWatcher.php index 802904b..456e817 100644 --- a/src/Watcher/Fallback/FallbackWatcher.php +++ b/src/Watcher/Fallback/FallbackWatcher.php @@ -2,13 +2,11 @@ namespace Phpactor\AmpFsWatch\Watcher\Fallback; -use Amp\Promise; -use Amp\Success; use Phpactor\AmpFsWatch\Watcher; +use Phpactor\AmpFsWatch\WatcherProcess; use Phpactor\AmpFsWatch\Watcher\Null\NullWatcher; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use function Amp\call; class FallbackWatcher implements Watcher { @@ -32,19 +30,17 @@ public function __construct(array $watchers, ?LoggerInterface $logger = null) } } - public function watch(): Promise + public function watch(): WatcherProcess { - return call(function () { - $watcher = (yield $this->resolveWatcher()); - $this->lastWatcherName = $watcher->describe(); + $watcher = $this->resolveWatcher(); + $this->lastWatcherName = $watcher->describe(); - return $watcher->watch(); - }); + return $watcher->watch(); } - public function isSupported(): Promise + public function isSupported(): bool { - return new Success(true); + return true; } @@ -62,28 +58,23 @@ private function add(Watcher $watcher): void $this->watchers[] = $watcher; } - /** - * @return Promise - */ - private function resolveWatcher(): Promise + private function resolveWatcher(): Watcher { - return call(function () { - $names = []; - foreach ($this->watchers as $watcher) { - if (!yield $watcher->isSupported()) { - $names[] = yield new Success($watcher->describe()); - continue; - } - - return $watcher; + $names = []; + foreach ($this->watchers as $watcher) { + if (!$watcher->isSupported()) { + $names[] = $watcher->describe(); + continue; } - $this->logger->warning(sprintf( - 'No supported watchers, tried "%s".', - implode('", "', $names) - )); + return $watcher; + } + + $this->logger->warning(sprintf( + 'No supported watchers, tried "%s".', + implode('", "', $names) + )); - return new NullWatcher(); - }); + return new NullWatcher(); } } diff --git a/src/Watcher/Find/FindWatcher.php b/src/Watcher/Find/FindWatcher.php index 835ede0..747126f 100644 --- a/src/Watcher/Find/FindWatcher.php +++ b/src/Watcher/Find/FindWatcher.php @@ -2,11 +2,9 @@ namespace Phpactor\AmpFsWatch\Watcher\Find; -use Amp\ByteStream\LineReader; -use Amp\Delayed; +use Amp\ByteStream\ReadableResourceStream; +use Amp\Pipeline\Pipeline; use Amp\Process\Process; -use Amp\Promise; -use Amp\Process\ProcessInputStream; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; @@ -16,6 +14,9 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; + +use function Amp\ByteStream\splitLines; +use function Amp\async; use function Amp\delay; class FindWatcher implements Watcher, WatcherProcess @@ -44,49 +45,45 @@ public function __construct( $this->lastUpdateFile = $config->lastUpdateReferenceFile() ?: $this->createTempFile(); } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - $this->logger->info(sprintf( - 'Polling at interval of "%s" milliseconds for changes paths "%s"', - $this->config->pollInterval(), - implode('", "', $this->config->paths()) - )); + $this->logger->info(sprintf( + 'Polling at interval of "%s" milliseconds for changes paths "%s"', + $this->config->pollInterval(), + implode('", "', $this->config->paths()) + )); - $this->updateDateReference(); - $this->running = true; + $this->updateDateReference(); + $this->running = true; - yield delay(10); + async(function (): void { + delay(.01); - \Amp\asyncCall(function () { - while ($this->running) { - $searches = []; - foreach ($this->config->paths() as $path) { - $searches[] = $this->search($path); - } - yield \Amp\Promise\all($searches); - $this->updateDateReference(); - yield new Delayed($this->config->pollInterval()); + while ($this->running) { + foreach ($this->config->paths() as $path) { + $this->search($path); } - }); - - return $this; + $this->updateDateReference(); + delay($this->config->pollInterval() / 1000); + } }); + + return $this; } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while ($this->running) { - $this->queue = $this->queue->compress(); + while ($this->running) { + $this->queue = $this->queue->compress(); - if ($next = $this->queue->dequeue()) { - return $next; - } - - yield new Delayed($this->config->pollInterval() / 2); + if ($next = $this->queue->dequeue()) { + return $next; } - }); + + delay($this->config->pollInterval() / 2000); + } + + return null; } public function stop(): void @@ -94,7 +91,7 @@ public function stop(): void $this->running = false; } - public function isSupported(): Promise + public function isSupported(): bool { return $this->commandDetector->commandExists('find'); } @@ -105,81 +102,69 @@ public function describe(): string return 'find (BSD/GNU)'; } - /** - * @return Promise - */ - private function search(string $path): Promise + private function search(string $path): void { - return \Amp\call(function () use ($path) { - $start = microtime(true); - $process = yield $this->startProcess($path); + $start = microtime(true); + $process = $this->startProcess($path); - $this->feedQueue($process->getStdout()); + $this->feedQueue($process->getStdout()); - $exitCode = yield $process->join(); - $stop = microtime(true); + $exitCode = $process->join(); + $stop = microtime(true); - $this->logger->debug(sprintf( - 'pid:%s Find process "%s" done in %s seconds', - getmypid(), - $process->getCommand(), - number_format($stop - $start, 2) - )); + $this->logger->debug(sprintf( + 'pid:%s Find process "%s" done in %s seconds', + getmypid(), + $process->getCommand(), + number_format($stop - $start, 2) + )); - if ($exitCode === 0) { - return; - } + if ($exitCode === 0) { + return; + } - $stderr = yield $process->getStderr()->read(); - $this->logger->error(sprintf( - 'Process "%s" exited with error code %s: %s', - $process->getCommand(), - $exitCode, - $stderr - )); - }); + $stderr = $process->getStderr()->read(); + $this->logger->error(sprintf( + 'Process "%s" exited with error code %s: %s', + $process->getCommand(), + $exitCode, + $stderr + )); } - private function feedQueue(ProcessInputStream $stream): void + private function feedQueue(ReadableResourceStream $stream): void { - \Amp\asyncCall(function () use ($stream) { - $reader = new LineReader($stream); - while (null !== $line = yield $reader->readLine()) { - $this->logger->debug('find found: ' . $line); - $this->queue->enqueue(new ModifiedFile($line, is_file($line) ? ModifiedFile::TYPE_FILE : ModifiedFile::TYPE_FOLDER)); - } - }); + $reader = Pipeline::fromIterable(splitLines($stream))->getIterator(); + + while (false !== $reader->continue()) { + $line = $reader->getValue(); + $this->logger->debug('find found: ' . $line); + $this->queue->enqueue(new ModifiedFile($line, is_file($line) ? ModifiedFile::TYPE_FILE : ModifiedFile::TYPE_FOLDER)); + } } - /** - * @return Promise - */ - private function startProcess(string $path): Promise + private function startProcess(string $path): Process { - return \Amp\call(function () use ($path) { - // use ctime (inode status change time) rather than modification - // time as vendor libraries (for example) preserve the modification - // times. - $process = new Process([ - 'find', - $path, - '-mindepth', - '1', - '-newercc', - $this->lastUpdateFile - ]); - - $pid = yield $process->start(); - - if (!$process->isRunning()) { - throw new RuntimeException(sprintf( - 'Could not start process: %s', - $process->getCommand() - )); - } + // use ctime (inode status change time) rather than modification + // time as vendor libraries (for example) preserve the modification + // times. + $process = Process::start([ + 'find', + $path, + '-mindepth', + '1', + '-newercc', + $this->lastUpdateFile, + ]); + + if (!$process->isRunning()) { + throw new RuntimeException(sprintf( + 'Could not start process: %s', + $process->getCommand() + )); + } - return $process; - }); + return $process; } private function updateDateReference(): void diff --git a/src/Watcher/FsWatch/FsWatchWatcher.php b/src/Watcher/FsWatch/FsWatchWatcher.php index 47396f5..25ae5ef 100644 --- a/src/Watcher/FsWatch/FsWatchWatcher.php +++ b/src/Watcher/FsWatch/FsWatchWatcher.php @@ -2,11 +2,10 @@ namespace Phpactor\AmpFsWatch\Watcher\FsWatch; -use Amp\ByteStream\LineReader; -use Amp\Delayed; +use Amp\Pipeline\Pipeline; use Amp\Process\Process; -use Amp\Process\StatusError; -use Amp\Promise; +use Amp\Process\ProcessException; +use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; use Phpactor\AmpFsWatch\ModifiedFileBuilder; @@ -17,6 +16,10 @@ use Psr\Log\NullLogger; use RuntimeException; +use function Amp\ByteStream\splitLines; +use function Amp\async; +use function Amp\delay; + class FsWatchWatcher implements Watcher, WatcherProcess { private const CMD = 'fswatch'; @@ -46,35 +49,31 @@ public function __construct( } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - $this->process = yield $this->startProcess(); - $this->running = true; - $this->feedQueue($this->process); - return $this; - }); + $this->process = $this->startProcess(); + $this->running = true; + $this->feedQueue($this->process); + return $this; } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while (false === $this->process->isRunning()) { - yield new Delayed(self::POLL_TIME); - } - - while ($this->running) { - $this->queue = $this->queue->compress(); + while (false === $this->process->isRunning()) { + delay(self::POLL_TIME / 1000); + } - if ($next = $this->queue->dequeue()) { - return $next; - } + while ($this->running) { + $this->queue = $this->queue->compress(); - yield new Delayed(self::POLL_TIME); + if ($next = $this->queue->dequeue()) { + return $next; } - return null; - }); + delay(self::POLL_TIME / 1000); + } + + return null; } public function stop(): void @@ -87,11 +86,11 @@ public function stop(): void $this->running = false; try { $this->process->signal(SIGTERM); - } catch (StatusError) { + } catch (ProcessException) { } } - public function isSupported(): Promise + public function isSupported(): bool { return $this->commandDetector->commandExists(self::CMD); } @@ -102,40 +101,36 @@ public function describe(): string return 'fs-watch'; } - /** - * @return Promise - */ - private function startProcess(): Promise + private function startProcess(): Process { - return \Amp\call(function () { - $process = new Process(array_merge([ - self::CMD, - ], $this->config->paths(), [ + $process = Process::start(array_merge([ + self::CMD, + ], $this->config->paths(), [ '-r', '--event=Created', '--event=Updated', - '--event=Removed' + '--event=Removed', ])); - $pid = yield $process->start(); - $this->logger->debug(sprintf('Started "%s"', $process->getCommand())); + $this->logger->debug(sprintf('Started "%s"', $process->getCommand())); - if (!$process->isRunning()) { - throw new RuntimeException(sprintf( - 'Could not start process: %s', - $process->getCommand() - )); - } + if (!$process->isRunning()) { + throw new RuntimeException(sprintf( + 'Could not start process: %s', + $process->getCommand() + )); + } - return $process; - }); + return $process; } private function feedQueue(Process $process): void { - $reader = new LineReader($process->getStdout()); - \Amp\asyncCall(function () use ($reader) { - while (null !== $line = yield $reader->readLine()) { + $reader = Pipeline::fromIterable(splitLines($process->getStdout())) + ->getIterator(); + async(function () use ($reader): void { + while (false !== $reader->continue()) { + $line = $reader->getValue(); $builder = ModifiedFileBuilder::fromPath($line); if (file_exists($line) && !is_file($line)) { $builder->asFolder(); diff --git a/src/Watcher/Inotify/InotifyWatcher.php b/src/Watcher/Inotify/InotifyWatcher.php index 59b9304..202050a 100644 --- a/src/Watcher/Inotify/InotifyWatcher.php +++ b/src/Watcher/Inotify/InotifyWatcher.php @@ -2,11 +2,10 @@ namespace Phpactor\AmpFsWatch\Watcher\Inotify; -use Amp\ByteStream\LineReader; +use Amp\Pipeline\ConcurrentIterator; +use Amp\Pipeline\Pipeline; use Amp\Process\Process; -use Amp\Process\StatusError; -use Amp\Promise; -use Amp\Success; +use Amp\Process\ProcessException; use Phpactor\AmpFsWatch\Exception\WatcherDied; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; @@ -19,7 +18,9 @@ use Psr\Log\NullLogger; use RuntimeException; use Symfony\Component\Filesystem\Path; + use function Amp\ByteStream\buffer; +use function Amp\ByteStream\splitLines; class InotifyWatcher implements Watcher, WatcherProcess { @@ -35,7 +36,10 @@ class InotifyWatcher implements Watcher, WatcherProcess private WatcherConfig $config; - private LineReader $lineReader; + /** + * @var ConcurrentIterator + */ + private ConcurrentIterator $lineReader; /** * @var array @@ -54,58 +58,56 @@ public function __construct( $this->config = $config; } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - $this->process = yield $this->startProcess(); - $this->lineReader = new LineReader($this->process->getStdout()); + $this->process = $this->startProcess(); + $this->lineReader = Pipeline::fromIterable(splitLines($this->process->getStdout()))->getIterator(); - return $this; - }); + return $this; } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while (null !== $file = array_shift($this->directoryBuffer)) { - return $file; - } - - if (null === $line = yield $this->lineReader->readLine()) { - $exitCode = yield $this->process->join(); + while (null !== $file = array_shift($this->directoryBuffer)) { + return $file; + } - // probably ran out of watchers, throw an error which can be - // handled downstream. - if ($exitCode === 1) { - throw new WatcherDied(sprintf( - 'Inotify exited with status code "%s": %s', - $exitCode, - yield buffer($this->process->getStderr()) - )); - } + if (false === $this->lineReader->continue()) { + $exitCode = $this->process->join(); - return null; + // probably ran out of watchers, throw an error which can be + // handled downstream. + if ($exitCode === 1) { + throw new WatcherDied(sprintf( + 'Inotify exited with status code "%s": %s', + $exitCode, + buffer($this->process->getStderr()) + )); } - $event = InotifyEvent::createFromCsv($line); + return null; + } - $builder = ModifiedFileBuilder::fromPathSegments( - $event->watchedFileName(), - $event->eventFilename() - ); + $line = $this->lineReader->getValue(); - if ($event->hasEventName('ISDIR')) { - $builder = $builder->asFolder(); - } + $event = InotifyEvent::createFromCsv($line); - $modifiedFile = $builder->build(); + $builder = ModifiedFileBuilder::fromPathSegments( + $event->watchedFileName(), + $event->eventFilename() + ); - if ($event->hasEventName('MOVED_TO') && $modifiedFile->type() === ModifiedFile::TYPE_FOLDER) { - yield $this->enqueueDirectory($modifiedFile->path()); - } + if ($event->hasEventName('ISDIR')) { + $builder = $builder->asFolder(); + } + + $modifiedFile = $builder->build(); + + if ($event->hasEventName('MOVED_TO') && $modifiedFile->type() === ModifiedFile::TYPE_FOLDER) { + $this->enqueueDirectory($modifiedFile->path()); + } - return $modifiedFile; - }); + return $modifiedFile; } public function stop(): void @@ -118,14 +120,14 @@ public function stop(): void try { $this->process->signal(SIGTERM); - } catch (StatusError) { + } catch (ProcessException) { } } - public function isSupported(): Promise + public function isSupported(): bool { if (!$this->osDetector->isLinux()) { - return new Success(false); + return false; } return $this->commandDetector->commandExists(self::INOTIFY_CMD); @@ -137,60 +139,49 @@ public function describe(): string return 'inotify'; } - /** - * @return Promise - */ - private function startProcess(): Promise + private function startProcess(): Process { - return \Amp\call(function () { - $process = new Process(array_merge([ - self::INOTIFY_CMD, - '-r', - '-emodify,create,delete,move', - '--monitor', - '--csv', - ], $this->config->paths())); - - $pid = yield $process->start(); - $this->logger->debug(sprintf('Started "%s"', $process->getCommand())); - - if (!$process->isRunning()) { - throw new WatcherDied(sprintf( - 'Could not start process: %s', - $process->getCommand() - )); - } + $process = Process::start(array_merge([ + self::INOTIFY_CMD, + '-r', + '-emodify,create,delete,move', + '--monitor', + '--csv', + ], $this->config->paths())); + + $this->logger->debug(sprintf('Started "%s" (PID %s)', $process->getCommand(), $process->getPid())); + + if (!$process->isRunning()) { + throw new WatcherDied(sprintf( + 'Could not start process: %s', + $process->getCommand() + )); + } - return $process; - }); + return $process; } - /** - * @return Promise - */ - private function enqueueDirectory(string $path): Promise + private function enqueueDirectory(string $path): void { - return \Amp\call(function () use ($path) { - $files = scandir($path); - foreach ((array)$files as $file) { - if (false === $file || $file === '.' || $file === '..') { - continue; - } - - $filePath = Path::join($path, $file); - $isDir = is_dir($filePath); - $file = ModifiedFileBuilder::fromPath($filePath); - - if ($isDir) { - $file = $file->asFolder(); - } - - $this->directoryBuffer[] = $file->build(); - - if ($isDir) { - yield $this->enqueueDirectory($filePath); - } + $files = scandir($path); + foreach ((array)$files as $file) { + if (false === $file || $file === '.' || $file === '..') { + continue; } - }); + + $filePath = Path::join($path, $file); + $isDir = is_dir($filePath); + $file = ModifiedFileBuilder::fromPath($filePath); + + if ($isDir) { + $file = $file->asFolder(); + } + + $this->directoryBuffer[] = $file->build(); + + if ($isDir) { + $this->enqueueDirectory($filePath); + } + } } } diff --git a/src/Watcher/Null/NullWatcher.php b/src/Watcher/Null/NullWatcher.php index a9cc78c..f8167cd 100644 --- a/src/Watcher/Null/NullWatcher.php +++ b/src/Watcher/Null/NullWatcher.php @@ -2,36 +2,29 @@ namespace Phpactor\AmpFsWatch\Watcher\Null; -use Amp\Promise; -use Amp\Success; +use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\WatcherProcess; - use Phpactor\AmpFsWatch\Watcher; class NullWatcher implements Watcher, WatcherProcess { - - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - return $this; - }); + return $this; } - public function isSupported(): Promise + public function isSupported(): bool { - return new Success(true); + return true; } public function stop(): void { } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - return null; - }); + return null; } diff --git a/src/Watcher/PatternMatching/PatternMatchingWatcher.php b/src/Watcher/PatternMatching/PatternMatchingWatcher.php index f266117..8c60688 100644 --- a/src/Watcher/PatternMatching/PatternMatchingWatcher.php +++ b/src/Watcher/PatternMatching/PatternMatchingWatcher.php @@ -2,8 +2,8 @@ namespace Phpactor\AmpFsWatch\Watcher\PatternMatching; -use Amp\Promise; use Phpactor\AmpFsWatch\Watcher; +use Phpactor\AmpFsWatch\WatcherProcess; class PatternMatchingWatcher implements Watcher { @@ -30,15 +30,13 @@ public function __construct(Watcher $innerWatcher, array $includePatterns, array $this->excludePatterns = $excludePatterns; } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - $process = yield $this->innerWatcher->watch(); - return new PatternWatcherProcess($process, $this->includePatterns, $this->excludePatterns); - }); + $process = $this->innerWatcher->watch(); + return new PatternWatcherProcess($process, $this->includePatterns, $this->excludePatterns); } - public function isSupported(): Promise + public function isSupported(): bool { return $this->innerWatcher->isSupported(); } diff --git a/src/Watcher/PatternMatching/PatternWatcherProcess.php b/src/Watcher/PatternMatching/PatternWatcherProcess.php index f397c3b..460f615 100644 --- a/src/Watcher/PatternMatching/PatternWatcherProcess.php +++ b/src/Watcher/PatternMatching/PatternWatcherProcess.php @@ -2,7 +2,7 @@ namespace Phpactor\AmpFsWatch\Watcher\PatternMatching; -use Amp\Promise; +use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\WatcherProcess; class PatternWatcherProcess implements WatcherProcess @@ -39,24 +39,24 @@ public function stop(): void } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while (null !== $file = yield $this->process->wait()) { - foreach ($this->includePatterns as $pattern) { - if (false === $this->matcher->matches($file->path(), $pattern)) { - continue 2; - } + while (null !== $file = $this->process->wait()) { + foreach ($this->includePatterns as $pattern) { + if (false === $this->matcher->matches($file->path(), $pattern)) { + continue 2; } + } - foreach ($this->excludePatterns as $pattern) { - if (true === $this->matcher->matches($file->path(), $pattern)) { - continue 2; - } + foreach ($this->excludePatterns as $pattern) { + if (true === $this->matcher->matches($file->path(), $pattern)) { + continue 2; } - - return $file; } - }); + + return $file; + } + + return null; } } diff --git a/src/Watcher/PhpPollWatcher/PhpPollWatcher.php b/src/Watcher/PhpPollWatcher/PhpPollWatcher.php index 168a3e0..ffc0f1b 100644 --- a/src/Watcher/PhpPollWatcher/PhpPollWatcher.php +++ b/src/Watcher/PhpPollWatcher/PhpPollWatcher.php @@ -2,9 +2,6 @@ namespace Phpactor\AmpFsWatch\Watcher\PhpPollWatcher; -use Amp\Delayed; -use Amp\Promise; -use Amp\Success; use DateTimeImmutable; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; @@ -15,6 +12,9 @@ use Psr\Log\NullLogger; use Symfony\Component\Filesystem\Path; +use function Amp\async; +use function Amp\delay; + class PhpPollWatcher implements Watcher, WatcherProcess { private LoggerInterface $logger; @@ -36,59 +36,55 @@ public function __construct( $this->config = $config; } - public function watch(): Promise + public function watch(): WatcherProcess { - return \Amp\call(function () { - $this->logger->info(sprintf( - 'Polling at interval of "%s" milliseconds for changes paths "%s"', - $this->config->pollInterval(), - implode('", "', $this->config->paths()) - )); - - $this->updateDateReference(); - $this->running = true; - - \Amp\asyncCall(function () { - while ($this->running) { - $start = microtime(true); - $searches = []; + $this->logger->info(sprintf( + 'Polling at interval of "%s" milliseconds for changes paths "%s"', + $this->config->pollInterval(), + implode('", "', $this->config->paths()) + )); - foreach ($this->config->paths() as $path) { - $searches[] = $this->search($path); - } + $this->updateDateReference(); + $this->running = true; - yield \Amp\Promise\all($searches); + async(function (): void { + while ($this->running) { + $start = microtime(true); + $searches = []; - $this->logger->debug(sprintf( - 'pid: %s PHP watcher scanned paths "%s" in %s seconds', - getmypid(), - implode('", "', $this->config->paths()), - number_format(microtime(true) - $start, 2) - )); + foreach ($this->config->paths() as $path) { + $this->search($path); + } - $this->updateDateReference(); + $this->logger->debug(sprintf( + 'pid: %s PHP watcher scanned paths "%s" in %s seconds', + getmypid(), + implode('", "', $this->config->paths()), + number_format(microtime(true) - $start, 2) + )); - yield new Delayed($this->config->pollInterval()); - } - }); + $this->updateDateReference(); - return $this; + delay($this->config->pollInterval() / 1000); + } }); + + return $this; } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - while ($this->running) { - $this->queue = $this->queue->compress(); - - if ($next = $this->queue->dequeue()) { - return $next; - } + while ($this->running) { + $this->queue = $this->queue->compress(); - yield new Delayed($this->config->pollInterval() / 2); + if ($next = $this->queue->dequeue()) { + return $next; } - }); + + delay($this->config->pollInterval() / 2000); + } + + return null; } public function stop(): void @@ -96,9 +92,9 @@ public function stop(): void $this->running = false; } - public function isSupported(): Promise + public function isSupported(): bool { - return new Success(true); + return true; } @@ -107,36 +103,31 @@ public function describe(): string return 'php-poll'; } - /** - * @return Promise - */ - private function search(string $path): Promise + private function search(string $path): void { - return \Amp\call(function () use ($path) { - $files = scandir($path); - foreach ((array)$files as $file) { - if (false === $file || $file === '.' || $file === '..') { - continue; - } - $filePath = Path::join($path, $file); - clearstatcache(); - $mtime = filectime($filePath); - $isDir = is_dir($filePath); - - - // we are only accurate to seconds, so accept also - // if mtime is the same as current timestamp - if ($mtime >= $this->lastUpdate->format('U')) { - $this->queue->enqueue( - new ModifiedFile($filePath, $isDir ? ModifiedFile::TYPE_FOLDER : ModifiedFile::TYPE_FILE) - ); - } + $files = scandir($path); + foreach ((array)$files as $file) { + if (false === $file || $file === '.' || $file === '..') { + continue; + } + $filePath = Path::join($path, $file); + clearstatcache(); + $mtime = filectime($filePath); + $isDir = is_dir($filePath); + + + // we are only accurate to seconds, so accept also + // if mtime is the same as current timestamp + if ($mtime >= $this->lastUpdate->format('U')) { + $this->queue->enqueue( + new ModifiedFile($filePath, $isDir ? ModifiedFile::TYPE_FOLDER : ModifiedFile::TYPE_FILE) + ); + } - if ($isDir) { - yield $this->search($filePath); - } + if ($isDir) { + $this->search($filePath); } - }); + } } diff --git a/src/Watcher/TestWatcher/TestWatcher.php b/src/Watcher/TestWatcher/TestWatcher.php index 23b996f..311a971 100644 --- a/src/Watcher/TestWatcher/TestWatcher.php +++ b/src/Watcher/TestWatcher/TestWatcher.php @@ -2,14 +2,14 @@ namespace Phpactor\AmpFsWatch\Watcher\TestWatcher; -use Amp\Delayed; -use Amp\Promise; -use Amp\Success; use Exception; +use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherProcess; +use function Amp\delay; + class TestWatcher implements Watcher, WatcherProcess { private ModifiedFileQueue $queue; @@ -25,41 +25,37 @@ public function __construct(ModifiedFileQueue $queue, int $delay = 0, ?Exception $this->error = $error; } - public function watch(): Promise + public function watch(): WatcherProcess { - return new Success($this); + return $this; } - public function isSupported(): Promise + public function isSupported(): bool { - return new Success(true); + return true; } public function stop(): void { } - - public function wait(): Promise + public function wait(): ?ModifiedFile { - return \Amp\call(function () { - if ($this->delay) { - yield new Delayed($this->delay); - } + if ($this->delay) { + delay($this->delay / 1000); + } - if ($this->error) { - throw $this->error; - } + if ($this->error) { + throw $this->error; + } - while (null !== $file = $this->queue->dequeue()) { - return $file; - } + while (null !== $file = $this->queue->dequeue()) { + return $file; + } - return null; - }); + return null; } - public function describe(): string { return 'test'; diff --git a/src/Watcher/Watchman/WatchmanWatcher.php b/src/Watcher/Watchman/WatchmanWatcher.php index 181b8a0..3f9030f 100644 --- a/src/Watcher/Watchman/WatchmanWatcher.php +++ b/src/Watcher/Watchman/WatchmanWatcher.php @@ -2,10 +2,11 @@ namespace Phpactor\AmpFsWatch\Watcher\Watchman; -use Amp\ByteStream\LineReader; +use Amp\Future; +use Amp\Pipeline\ConcurrentIterator; +use Amp\Pipeline\Pipeline; use Amp\Process\Process; -use Amp\Process\StatusError; -use Amp\Promise; +use Amp\Process\ProcessException; use Phpactor\AmpFsWatch\Exception\WatcherDied; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; @@ -16,9 +17,11 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; + use function Amp\ByteStream\buffer; -use function Amp\Promise\first; -use function Amp\call; +use function Amp\ByteStream\splitLines; +use function Amp\Future\awaitAny; +use function Amp\async; class WatchmanWatcher implements Watcher, WatcherProcess { @@ -36,14 +39,14 @@ class WatchmanWatcher implements Watcher, WatcherProcess private WatcherConfig $config; /** - * @var LineReader[] + * @var ConcurrentIterator[] */ private array $lineReaders = []; /** - * @var array> + * @var array> */ - private array $lineReaderPromises = []; + private array $lineReaderFutures = []; /** * @var array @@ -60,61 +63,59 @@ public function __construct( $this->config = $config; } - public function watch(): Promise + public function watch(): WatcherProcess { - return call(function () { - yield $this->watchPaths(); + $this->watchPaths(); - foreach ($this->config->paths() as $path) { - $subscriber = yield $this->subscribe($path); - $this->subscribers[] = $subscriber; - $this->lineReaders[] = new LineReader($subscriber->getStdout()); - } + foreach ($this->config->paths() as $path) { + $subscriber = $this->subscribe($path); + $this->subscribers[] = $subscriber; + $this->lineReaders[] = Pipeline::fromIterable(splitLines($subscriber->getStdout()))->getIterator(); + } - return $this; - }); + return $this; } - public function wait(): Promise + public function wait(): ?ModifiedFile { - return call(function () { - while (null !== $file = array_shift($this->fileBuffer)) { - return $file; - } + while (null !== $file = array_shift($this->fileBuffer)) { + return $file; + } - while (null !== $line = yield $this->readLine()) { - $notification = json_decode($line, true); + while (null !== $line = $this->readLine()) { + $notification = json_decode($line, true); - if (false === $notification) { - throw new RuntimeException(sprintf( - 'Could not decode JSON from watchman: %s %s', - $line, - json_last_error_msg() - )); + if (false === $notification) { + throw new RuntimeException(sprintf( + 'Could not decode JSON from watchman: %s %s', + $line, + json_last_error_msg() + )); + } + + $files = array_map(function (array $file) use ($notification) { + $modifiedFile = ModifiedFileBuilder::fromPathSegments( + $notification['root'], + $file['name'] + ); + if ($file['type'] === 'd') { + $modifiedFile = $modifiedFile->asFolder(); } - $files = array_map(function (array $file) use ($notification) { - $modifiedFile = ModifiedFileBuilder::fromPathSegments( - $notification['root'], - $file['name'] - ); - if ($file['type'] === 'd') { - $modifiedFile = $modifiedFile->asFolder(); - } + return $modifiedFile->build(); + }, $notification['files'] ?? []); - return $modifiedFile->build(); - }, $notification['files'] ?? []); + if (empty($files)) { + continue; + } - if (empty($files)) { - continue; - } + $file = array_shift($files); + $this->fileBuffer = array_merge($this->fileBuffer, $files); - $file = array_shift($files); - $this->fileBuffer = array_merge($this->fileBuffer, $files); + return $file; + }; - return $file; - }; - }); + return null; } public function stop(): void @@ -122,12 +123,12 @@ public function stop(): void foreach ($this->subscribers as $subscriber) { try { $subscriber->signal(SIGTERM); - } catch (StatusError) { + } catch (ProcessException) { } } } - public function isSupported(): Promise + public function isSupported(): bool { return $this->commandDetector->commandExists(self::WATCHMAN_CMD); } @@ -138,130 +139,119 @@ public function describe(): string return 'watchman'; } - /** - * @return Promise - */ - private function watchPaths(): Promise - { - return call(function () { - foreach ($this->config->paths() as $path) { - $process = new Process([ - self::WATCHMAN_CMD, - 'watch', - $path - ]); - - - $pid = yield $process->start(); - $this->logger->debug(sprintf('Watchman: %s', $process->getCommand())); - $exit = yield $process->join(); - - if ($exit !== 0) { - throw new RuntimeException(sprintf( - 'Watchman exited with code "%s": %s ', - $exit, - yield buffer($process->getStderr()) - )); - } - } - }); - } - - /** - * @return Promise - */ - private function subscribe(string $path): Promise + private function watchPaths(): void { - return call(function () use ($path) { - $process = new Process([ + foreach ($this->config->paths() as $path) { + $process = Process::start([ self::WATCHMAN_CMD, - '-j', - '-p', - '--no-pretty' - ]); - $this->logger->debug(sprintf('Watchman: %s', $process->getCommand())); - - $pid = yield $process->start(); - $payload = (string)json_encode([ - 'subscribe', + 'watch', $path, - 'ampfs-watch', - [ - 'expression' => [ - 'allof', - [ - 'anyof', - ['type', 'f'], - ['type', 'd'] - ], - [ - 'since', - time(), - 'ctime' - ], - ], - 'fields' => [ - 'name','type', - ], - ] ]); - $this->logger->debug(sprintf('Watchman: %s', $payload)); - yield $process->getStdin()->write($payload); - if (!$process->isRunning()) { - throw new WatcherDied(sprintf( - 'Could not start process: %s', - $process->getCommand() + + $this->logger->debug(sprintf('Watchman: %s', $process->getCommand())); + $exit = $process->join(); + + if ($exit !== 0) { + throw new RuntimeException(sprintf( + 'Watchman exited with code "%s": %s ', + $exit, + buffer($process->getStderr()) )); } - - return $process; - }); + } } - /** - * @return Promise - */ - private function readLine(): Promise + private function subscribe(string $path): Process { - return call(function () { - foreach ($this->lineReaders as $index => $lineReader) { - if (array_key_exists((int)$index, $this->lineReaderPromises)) { - continue; - } + $process = Process::start([ + self::WATCHMAN_CMD, + '-j', + '-p', + '--no-pretty', + ]); + $this->logger->debug(sprintf('Watchman: %s', $process->getCommand())); + + $payload = (string)json_encode([ + 'subscribe', + $path, + 'ampfs-watch', + [ + 'expression' => [ + 'allof', + [ + 'anyof', + ['type', 'f'], + ['type', 'd'], + ], + [ + 'since', + time(), + 'ctime', + ], + ], + 'fields' => [ + 'name','type', + ], + ], + ]); + $this->logger->debug(sprintf('Watchman: %s', $payload)); + $process->getStdin()->write($payload); + + if (!$process->isRunning()) { + throw new WatcherDied(sprintf( + 'Could not start process: %s', + $process->getCommand() + )); + } + + return $process; + } - $this->lineReaderPromises[(int)$index] = call(function (int $index, LineReader $lineReader) { - $line = yield $lineReader->readLine(); - return [$index, $line]; - }, $index, $lineReader); + private function readLine(): ?string + { + foreach ($this->lineReaders as $index => $lineReader) { + if (array_key_exists((int)$index, $this->lineReaderFutures)) { + continue; } - [$index, $line] = yield first($this->lineReaderPromises); - unset($this->lineReaderPromises[(int)$index]); - $this->logger->debug(print_r($line, true)); + $this->lineReaderFutures[(int)$index] = async( + function (int $index, ConcurrentIterator $lineReader): array { + if (false === $lineReader->continue()) { + return [$index, null]; + } + return [$index, $lineReader->getValue()]; + }, + $index, + $lineReader, + ); + } + + [$index, $line] = awaitAny($this->lineReaderFutures); + unset($this->lineReaderFutures[(int)$index]); + $this->logger->debug(print_r($line, true)); + + if (null !== $line) { + return $line; + } - if (null !== $line) { - return $line; + foreach ($this->subscribers as $subscriber) { + if ($subscriber->isRunning()) { + continue; } + $exitCode = $subscriber->join(); - foreach ($this->subscribers as $subscriber) { - if ($subscriber->isRunning()) { - continue; - } - $exitCode = yield $subscriber->join(); - - // probably ran out of watchers, throw an error which can be - // handled downstream. - if ($exitCode === 1) { - throw new WatcherDied(sprintf( - 'Watchman subscriber exited with status code "%s": %s', - $exitCode, - yield buffer($subscriber->getStderr()) - )); - } + // probably ran out of watchers, throw an error which can be + // handled downstream. + if ($exitCode === 1) { + throw new WatcherDied(sprintf( + 'Watchman subscriber exited with status code "%s": %s', + $exitCode, + buffer($subscriber->getStderr()) + )); } + } - return null; - }); + return null; } } diff --git a/src/WatcherProcess.php b/src/WatcherProcess.php index 0540a20..ba5c230 100644 --- a/src/WatcherProcess.php +++ b/src/WatcherProcess.php @@ -2,14 +2,9 @@ namespace Phpactor\AmpFsWatch; -use Amp\Promise; - interface WatcherProcess { public function stop(): void; - /** - * @return Promise - */ - public function wait(): Promise; + public function wait(): ?ModifiedFile; } diff --git a/tests/Watcher/BufferedWatcher/BufferedWatcherTest.php b/tests/Watcher/BufferedWatcher/BufferedWatcherTest.php index a733486..1df8fe1 100644 --- a/tests/Watcher/BufferedWatcher/BufferedWatcherTest.php +++ b/tests/Watcher/BufferedWatcher/BufferedWatcherTest.php @@ -2,18 +2,17 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\BufferedWatcher; -use Amp\Delayed; use Amp\PHPUnit\AsyncTestCase; -use Generator; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; use Phpactor\AmpFsWatch\Watcher\BufferedWatcher\BufferedWatcher; use Phpactor\AmpFsWatch\Watcher\TestWatcher\TestWatcher; use RuntimeException; +use function Amp\delay; class BufferedWatcherTest extends AsyncTestCase { - public function testBufferedWatcher(): Generator + public function testBufferedWatcher(): void { $expectedFile1 = new ModifiedFile('/foo', ModifiedFile::TYPE_FILE); $expectedFile2 = new ModifiedFile('/bar', ModifiedFile::TYPE_FILE); @@ -22,15 +21,15 @@ public function testBufferedWatcher(): Generator $expectedFile1, ]); $bufferedWatcher = new BufferedWatcher(new TestWatcher($queue), 100); - $process = yield $bufferedWatcher->watch(); - $file1 = yield $process->wait(); - $file2 = yield $process->wait(); + $process = $bufferedWatcher->watch(); + $file1 = $process->wait(); + $file2 = $process->wait(); self::assertSame($expectedFile1, $file1); self::assertSame($expectedFile2, $file2); } - public function testDeduplicatesFiles(): Generator + public function testDeduplicatesFiles(): void { $expectedFile1 = new ModifiedFile('/foo', ModifiedFile::TYPE_FILE); $expectedFile2 = new ModifiedFile('/foo', ModifiedFile::TYPE_FILE); @@ -43,30 +42,30 @@ public function testDeduplicatesFiles(): Generator $expectedFile1, ]); $bufferedWatcher = new BufferedWatcher(new TestWatcher($queue), 100); - $process = yield $bufferedWatcher->watch(); - $file1 = yield $process->wait(); - $file2 = yield $process->wait(); + $process = $bufferedWatcher->watch(); + $file1 = $process->wait(); + $file2 = $process->wait(); self::assertSame($expectedFile2, $file1); self::assertSame($expectedFile4, $file2); } - public function testReturnsNulLWhenInnerWatcherStops(): Generator + public function testReturnsNulLWhenInnerWatcherStops(): void { $expectedFile1 = new ModifiedFile('/foo', ModifiedFile::TYPE_FILE); $queue = new ModifiedFileQueue([ $expectedFile1, ]); $bufferedWatcher = new BufferedWatcher(new TestWatcher($queue), 100); - $process = yield $bufferedWatcher->watch(); - $file1 = yield $process->wait(); - $file2 = yield $process->wait(); + $process = $bufferedWatcher->watch(); + $file1 = $process->wait(); + $file2 = $process->wait(); self::assertSame($expectedFile1, $file1); self::assertNull($file2); } - public function testErrorsBubbleUp(): Generator + public function testErrorsBubbleUp(): void { $this->expectExceptionMessage('sorry'); $expectedFile1 = new ModifiedFile('/foo', ModifiedFile::TYPE_FILE); @@ -74,8 +73,8 @@ public function testErrorsBubbleUp(): Generator $expectedFile1, ]); $bufferedWatcher = new BufferedWatcher(new TestWatcher($queue, 100, new RuntimeException('sorry')), 10); - $process = yield $bufferedWatcher->watch(); - yield new Delayed(100); - yield $process->wait(); + $process = $bufferedWatcher->watch(); + delay(0.1); + $process->wait(); } } diff --git a/tests/Watcher/Fallback/FallbackWatcherTest.php b/tests/Watcher/Fallback/FallbackWatcherTest.php index 5d6c560..e10decc 100644 --- a/tests/Watcher/Fallback/FallbackWatcherTest.php +++ b/tests/Watcher/Fallback/FallbackWatcherTest.php @@ -3,8 +3,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\Fallback; use Amp\PHPUnit\AsyncTestCase; -use Amp\Success; -use Generator; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\Watcher\Fallback\FallbackWatcher; use Phpactor\AmpFsWatch\Watcher\Null\NullWatcher; @@ -16,10 +14,7 @@ class FallbackWatcherTest extends AsyncTestCase { use \Prophecy\PhpUnit\ProphecyTrait; - /** - * @var ObjectProphecy|LoggerInterface - */ - private $logger; + private ObjectProphecy|LoggerInterface $logger; private ObjectProphecy $watcher1; @@ -44,9 +39,9 @@ public function testNameIsUnknownWhenCalledBeforeInitialization(): void self::assertEquals('unknown (pending invocation)', $watcher->describe()); } - public function testUsesFirstSupportedWatcher() + public function testUsesFirstSupportedWatcher(): void { - $this->watcher1->isSupported()->willReturn(new Success(false)); + $this->watcher1->isSupported()->willReturn(false); $callback = function (): void { }; @@ -56,24 +51,24 @@ public function testUsesFirstSupportedWatcher() $watcher = $this->createWatcher([ $this->watcher1->reveal(), - $nullWatcher + $nullWatcher, ]); - $process = yield $watcher->watch($paths, $callback); + $process = $watcher->watch($paths, $callback); self::assertSame($nullWatcher, $process); self::assertEquals('null', $watcher->describe()); } - public function testReturnsNullWatcherAndLogsWarningIfNoSupportedWatchers() + public function testReturnsNullWatcherAndLogsWarningIfNoSupportedWatchers(): void { - $this->watcher1->isSupported()->willReturn(new Success(false)); - $this->watcher2->isSupported()->willReturn(new Success(false)); + $this->watcher1->isSupported()->willReturn(false); + $this->watcher2->isSupported()->willReturn(false); $callback = function (): void { }; $paths = ['path1']; - $process = yield $this->createWatcher([ + $process = $this->createWatcher([ $this->watcher1->reveal(), $this->watcher2->reveal(), ])->watch($paths, $callback); @@ -83,10 +78,10 @@ public function testReturnsNullWatcherAndLogsWarningIfNoSupportedWatchers() self::assertInstanceOf(NullWatcher::class, $process); } - public function testIsAlwaysSupported(): Generator + public function testIsAlwaysSupported(): void { $watcher = $this->createWatcher([]); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } private function createWatcher(array $watchers): Watcher diff --git a/tests/Watcher/Find/FindWatcherTest.php b/tests/Watcher/Find/FindWatcherTest.php index 2f360f1..7e35eac 100644 --- a/tests/Watcher/Find/FindWatcherTest.php +++ b/tests/Watcher/Find/FindWatcherTest.php @@ -2,8 +2,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\Find; -use Amp\Success; -use Generator; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherConfig; @@ -21,28 +19,28 @@ class FindWatcherTest extends WatcherTestCase */ private ObjectProphecy $commandDetector; - public function testRemoval(): Generator + public function testRemoval(): void { $this->markTestSkipped('Not supported'); } - public function testIsSupported(): Generator + public function testIsSupported(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } - public function testIsNotSupportedIfFindNotFound(): Generator + public function testIsNotSupportedIfFindNotFound(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - $this->commandDetector->commandExists('find')->willReturn(new Success(false)); - self::assertFalse(yield $watcher->isSupported()); + $this->commandDetector->commandExists('find')->willReturn(false); + self::assertFalse($watcher->isSupported()); } protected function createWatcher(WatcherConfig $config): Watcher { $this->commandDetector = $this->prophesize(CommandDetector::class); - $this->commandDetector->commandExists('find')->willReturn(new Success(true)); + $this->commandDetector->commandExists('find')->willReturn(true); return new FindWatcher( $config->withPollInterval(100), diff --git a/tests/Watcher/FsWatch/FsWatchWatcherTest.php b/tests/Watcher/FsWatch/FsWatchWatcherTest.php index 0fc076f..a4d0bb4 100644 --- a/tests/Watcher/FsWatch/FsWatchWatcherTest.php +++ b/tests/Watcher/FsWatch/FsWatchWatcherTest.php @@ -2,8 +2,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\FsWatch; -use Amp\Success; -use Generator; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherConfig; @@ -24,19 +22,19 @@ protected function setUp(): void $this->commandDetector->commandExists('fswatch')->willReturn(true); } - public function testIsSupported(): Generator + public function testIsSupported(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - $this->commandDetector->commandExists('fswatch')->willReturn(new Success(true)); + $this->commandDetector->commandExists('fswatch')->willReturn(true); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } - public function testNotSupportedIfCommandNotFound(): Generator + public function testNotSupportedIfCommandNotFound(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - $this->commandDetector->commandExists('fswatch')->willReturn(new Success(false)); - self::assertFalse(yield $watcher->isSupported()); + $this->commandDetector->commandExists('fswatch')->willReturn(false); + self::assertFalse($watcher->isSupported()); } protected function createWatcher(WatcherConfig $config): Watcher diff --git a/tests/Watcher/Inotify/InotifyWatcherTest.php b/tests/Watcher/Inotify/InotifyWatcherTest.php index bfe7562..e5dd7df 100644 --- a/tests/Watcher/Inotify/InotifyWatcherTest.php +++ b/tests/Watcher/Inotify/InotifyWatcherTest.php @@ -2,9 +2,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\Inotify; -use Amp\Delayed; -use Amp\Success; -use Generator; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; use Phpactor\AmpFsWatch\SystemDetector\OsDetector; use Phpactor\AmpFsWatch\Watcher; @@ -14,6 +11,9 @@ use Prophecy\Prophecy\ObjectProphecy; use Symfony\Component\Filesystem\Path; +use function Amp\async; +use function Amp\delay; + class InotifyWatcherTest extends WatcherTestCase { use \Prophecy\PhpUnit\ProphecyTrait; @@ -31,35 +31,35 @@ protected function setUp(): void $this->osValidator->isLinux()->willReturn(true); } - public function testIsSupported(): Generator + public function testIsSupported(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - $this->commandDetector->commandExists('inotifywait')->willReturn(new Success(true)); + $this->commandDetector->commandExists('inotifywait')->willReturn(true); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } - public function testNotSupportedOnNonLinux(): Generator + public function testNotSupportedOnNonLinux(): void { $watcher = $this->createWatcher(new WatcherConfig([])); $this->osValidator->isLinux()->willReturn(false); - $this->commandDetector->commandExists('inotifywait')->willReturn(new Success(true)); - self::assertFalse(yield $watcher->isSupported()); + $this->commandDetector->commandExists('inotifywait')->willReturn(true); + self::assertFalse($watcher->isSupported()); } - public function testNotSupportedIfCommandNotFound(): Generator + public function testNotSupportedIfCommandNotFound(): void { $watcher = $this->createWatcher(new WatcherConfig([])); $this->osValidator->isLinux()->willReturn(true); - $this->commandDetector->commandExists('inotifywait')->willReturn(new Success(false)); - self::assertFalse(yield $watcher->isSupported()); + $this->commandDetector->commandExists('inotifywait')->willReturn(false); + self::assertFalse($watcher->isSupported()); } - public function testMove(): Generator + public function testMove(): void { - $process = yield $this->startProcess(); + $process = $this->startProcess(); - yield $this->delay(); + $this->delay(); $this->workspace()->put('foobar/baz.php', 'content'); $this->workspace()->put('foobar/bar.php', 'content'); @@ -67,18 +67,19 @@ public function testMove(): Generator $this->workspace()->put('foobar/1.php', 'content'); rename($this->workspace()->path('foobar'), $this->workspace()->path('barfoo')); - yield $this->delay(); - yield $this->delay(); + $this->delay(); + $this->delay(); $files = []; - \Amp\asyncCall(function () use (&$files, $process) { - while (null !== $file = yield $process->wait()) { + + async(function () use (&$files, $process): void { + while (null !== $file = $process->wait()) { $path = Path::makeRelative($file->path(), $this->workspace()->path()); $files[$path] = true; } }); - yield new Delayed(10); + delay(.01); $process->stop(); self::assertArrayHasKey('barfoo/bar.php', $files); diff --git a/tests/Watcher/PatternMatching/PatternMatchingWatcherTest.php b/tests/Watcher/PatternMatching/PatternMatchingWatcherTest.php index 3372229..acb555e 100644 --- a/tests/Watcher/PatternMatching/PatternMatchingWatcherTest.php +++ b/tests/Watcher/PatternMatching/PatternMatchingWatcherTest.php @@ -3,7 +3,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\PatternMatching; use Amp\PHPUnit\AsyncTestCase; -use Generator; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\ModifiedFileQueue; use Phpactor\AmpFsWatch\Watcher; @@ -12,17 +11,16 @@ class PatternMatchingWatcherTest extends AsyncTestCase { - - public function testIncludesFiles() + public function testIncludesFiles(): void { - $process = yield $this->createWatcher(['/**/*.php'], [], [ + $process = $this->createWatcher(['/**/*.php'], [], [ $this->createFile('/Foobar.php'), $this->createFile('/Foobar.php~'), $this->createFile('/timestamp'), ])->watch(); $files = []; - while (null !== $file = yield $process->wait()) { + while (null !== $file = $process->wait()) { $files[] = $file; } @@ -30,25 +28,25 @@ public function testIncludesFiles() self::assertEquals($this->createFile('/Foobar.php'), $files[0]); } - public function testExcludesFiles() + public function testExcludesFiles(): void { - $process = yield $this->createWatcher(['/**/*.php'], ['/**/Foobar.php'], [ + $process = $this->createWatcher(['/**/*.php'], ['/**/Foobar.php'], [ $this->createFile('/Foobar.php'), $this->createFile('/Barfoo.php'), $this->createFile('/timestamp'), ])->watch(); $files = []; - while (null !== $file = yield $process->wait()) { + while (null !== $file = $process->wait()) { $files[] = $file; } self::assertCount(1, $files); } - public function testIsSupported(): Generator + public function testIsSupported(): void { - self::assertTrue(yield $this->createWatcher([], [], [])->isSupported()); + self::assertTrue($this->createWatcher([], [], [])->isSupported()); } protected function createWatcher(array $includePatterns, array $excludePatterns, array $modifiedFiles): Watcher { diff --git a/tests/Watcher/PhpPollWatcher/PhpPollWatcherTest.php b/tests/Watcher/PhpPollWatcher/PhpPollWatcherTest.php index 27b7348..3ec373b 100644 --- a/tests/Watcher/PhpPollWatcher/PhpPollWatcherTest.php +++ b/tests/Watcher/PhpPollWatcher/PhpPollWatcherTest.php @@ -2,7 +2,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\PhpPollWatcher; -use Generator; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherConfig; use Phpactor\AmpFsWatch\Watcher\PhpPollWatcher\PhpPollWatcher; @@ -10,17 +9,17 @@ class PhpPollWatcherTest extends WatcherTestCase { - - public function testIsSupported(): Generator + public function testIsSupported(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } - public function testRemoval(): Generator + public function testRemoval(): void { $this->markTestSkipped('Not supported'); } + protected function createWatcher(WatcherConfig $config): Watcher { return new PhpPollWatcher( diff --git a/tests/Watcher/WatcherTestCase.php b/tests/Watcher/WatcherTestCase.php index 8894b37..3d4ef82 100644 --- a/tests/Watcher/WatcherTestCase.php +++ b/tests/Watcher/WatcherTestCase.php @@ -2,9 +2,6 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher; -use Amp\Delayed; -use Amp\Promise; -use Generator; use Phpactor\AmpFsWatch\ModifiedFile; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherConfig; @@ -13,6 +10,8 @@ use Psr\Log\LoggerInterface; use Phpactor\AmpFsWatcher\Tests\IntegrationTestCase; +use function Amp\delay; + abstract class WatcherTestCase extends IntegrationTestCase { const DELAY_MILLI = 20; @@ -20,128 +19,128 @@ abstract class WatcherTestCase extends IntegrationTestCase protected function setUp(): void { parent::setUp(); - $this->setTimeout(5000); + $this->setTimeout(5); $this->workspace()->reset(); } - public function testSingleFileChange(): Generator + public function testSingleFileChange(): void { - $process = yield $this->startProcess(); - yield $this->delay(); + $process = $this->startProcess(); + $this->delay(); $this->workspace()->put('foobar', ''); - yield $this->delay(); + $this->delay(); self::assertEquals( new ModifiedFile( $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - yield $process->wait() + $process->wait() ); $process->stop(); } - public function testSingleFileChangeWithModificationTimeInPast(): Generator + public function testSingleFileChangeWithModificationTimeInPast(): void { - $process = yield $this->startProcess(); - yield $this->delay(); + $process = $this->startProcess(); + $this->delay(); touch($this->workspace()->path('foobar'), time() - 3600); - yield $this->delay(); + $this->delay(); self::assertEquals( new ModifiedFile( $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - yield $process->wait() + $process->wait() ); $process->stop(); } - public function testMultipleSameFile(): Generator + public function testMultipleSameFile(): void { - $process = yield $this->startProcess(); + $process = $this->startProcess(); - yield $this->delay(); + $this->delay(); $this->workspace()->put('foobar', ''); $this->workspace()->put('foobar', 'foobar'); - yield $this->delay(); + $this->delay(); self::assertEquals( new ModifiedFile( $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - yield $process->wait() + $process->wait() ); $process->stop(); } - public function testDirectory(): Generator + public function testDirectory(): void { - $process = yield $this->startProcess(); + $process = $this->startProcess(); - yield $this->delay(); + $this->delay(); $this->workspace()->mkdir('foobar'); - yield $this->delay(); + $this->delay(); self::assertEquals( new ModifiedFile( $this->workspace()->path('foobar'), ModifiedFile::TYPE_FOLDER ), - yield $process->wait() + $process->wait() ); $process->stop(); } - public function testRemoval(): Generator + public function testRemoval(): void { $this->workspace()->put('foobar', ''); - $process = yield $this->startProcess(); + $process = $this->startProcess(); - yield $this->delay(); + $this->delay(); unlink($this->workspace()->path('foobar')); - yield $this->delay(); + $this->delay(); self::assertEquals( new ModifiedFile( $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - yield $process->wait() + $process->wait() ); $process->stop(); } - public function testMultiplePaths(): Generator + public function testMultiplePaths(): void { $this->workspace()->mkdir('foobar'); $this->workspace()->mkdir('barfoo'); - $process = yield $this->startProcess([ + $process = $this->startProcess([ $this->workspace()->path('barfoo'), $this->workspace()->path('foobar'), ]); - yield $this->delay(); + $this->delay(); $this->workspace()->put('barfoo/foobar', ''); $this->workspace()->put('foobar/barfoo', ''); - yield $this->delay(); + $this->delay(); $files = []; for ($i = 0; $i < 2; $i++) { - $file = yield $process->wait(); + $file = $process->wait(); $files[$file->path()] = $file; } @@ -156,13 +155,13 @@ public function testReturnsNameAsString(): void self::assertIsString($this->createWatcher(new WatcherConfig([]))->describe()); } - abstract public function testIsSupported(): Generator; + abstract public function testIsSupported(): void; abstract protected function createWatcher(WatcherConfig $config): Watcher; - protected function delay(): Delayed + protected function delay(): void { - return new Delayed(self::DELAY_MILLI); + delay(self::DELAY_MILLI / 1000); } protected function createLogger(): LoggerInterface @@ -173,17 +172,15 @@ public function log($level, $message, array $context = []): void if ($level === 'debug') { return; } - fwrite(STDERR, sprintf('[%s] [%s] %s', microtime(), $level, $message)."\n"); + fwrite(STDERR, sprintf('[%s] [%s] %s', microtime(), $level, $message) . "\n"); } }; } /** * @param array $paths - * - * @return Promise */ - protected function startProcess(?array $paths = []): Promise + protected function startProcess(?array $paths = []): WatcherProcess { $paths = $paths ?: [ $this->workspace()->path() ]; $watcher = $this->createWatcher(new WatcherConfig($paths)); diff --git a/tests/Watcher/Watchman/WatchmanWatcherTest.php b/tests/Watcher/Watchman/WatchmanWatcherTest.php index 8590389..590e6c4 100644 --- a/tests/Watcher/Watchman/WatchmanWatcherTest.php +++ b/tests/Watcher/Watchman/WatchmanWatcherTest.php @@ -2,34 +2,30 @@ namespace Phpactor\AmpFsWatcher\Tests\Watcher\Watchman; -use Amp\Success; -use Generator; use Phpactor\AmpFsWatch\SystemDetector\CommandDetector; use Phpactor\AmpFsWatch\Watcher; use Phpactor\AmpFsWatch\WatcherConfig; use Phpactor\AmpFsWatch\Watcher\Watchman\WatchmanWatcher; use Phpactor\AmpFsWatcher\Tests\Watcher\WatcherTestCase; +use Prophecy\Prophecy\ObjectProphecy; class WatchmanWatcherTest extends WatcherTestCase { use \Prophecy\PhpUnit\ProphecyTrait; private const PLAN_DELAY = 100; - /** - * @var ObjectProphecy|CommandDetector - */ - private $commandDetector; + private ObjectProphecy|CommandDetector $commandDetector; - public function testIsSupported(): Generator + public function testIsSupported(): void { $watcher = $this->createWatcher(new WatcherConfig([])); - self::assertTrue(yield $watcher->isSupported()); + self::assertTrue($watcher->isSupported()); } protected function createWatcher(WatcherConfig $config): Watcher { $this->commandDetector = $this->prophesize(CommandDetector::class); - $this->commandDetector->commandExists('watchman')->willReturn(new Success(true)); + $this->commandDetector->commandExists('watchman')->willReturn(true); return new WatchmanWatcher( $config->withPollInterval(100), From 981ef7b0ce302864a867c94fa2dc3bf7b9447c78 Mon Sep 17 00:00:00 2001 From: przepompownia Date: Thu, 29 Jan 2026 02:42:34 +0100 Subject: [PATCH 2/4] Wait for watchers in separate fibers --- bin/watch | 3 ++- tests/Watcher/WatcherTestCase.php | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/bin/watch b/bin/watch index 1f9a581..3bd788c 100755 --- a/bin/watch +++ b/bin/watch @@ -10,6 +10,7 @@ use Phpactor\AmpFsWatch\Watcher\PatternMatching\PatternMatchingWatcher; use Phpactor\AmpFsWatch\Watcher\PhpPollWatcher\PhpPollWatcher; use Psr\Log\AbstractLogger; use Revolt\EventLoop; +use function Amp\async; require __DIR__ . '/../vendor/autoload.php'; @@ -46,6 +47,6 @@ if (defined('SIGINT')) { }); } -while (null !== $file = $process->wait()) { +while (null !== $file = async(fn () => $process->wait())->await()) { fwrite(STDOUT, sprintf('[%s] %s (%s)' . "\n", date('Y-m-d H:i:s.u'), $file->path(), $file->type())); } diff --git a/tests/Watcher/WatcherTestCase.php b/tests/Watcher/WatcherTestCase.php index 3d4ef82..35df1de 100644 --- a/tests/Watcher/WatcherTestCase.php +++ b/tests/Watcher/WatcherTestCase.php @@ -10,11 +10,12 @@ use Psr\Log\LoggerInterface; use Phpactor\AmpFsWatcher\Tests\IntegrationTestCase; +use function Amp\async; use function Amp\delay; abstract class WatcherTestCase extends IntegrationTestCase { - const DELAY_MILLI = 20; + const DELAY_MILLI = 30; protected function setUp(): void { @@ -35,7 +36,7 @@ public function testSingleFileChange(): void $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - $process->wait() + async(fn () => $process->wait())->await(), ); $process->stop(); @@ -53,7 +54,7 @@ public function testSingleFileChangeWithModificationTimeInPast(): void $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - $process->wait() + async(fn () => $process->wait())->await(), ); $process->stop(); @@ -73,7 +74,7 @@ public function testMultipleSameFile(): void $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - $process->wait() + async(fn () => $process->wait())->await(), ); $process->stop(); @@ -92,7 +93,7 @@ public function testDirectory(): void $this->workspace()->path('foobar'), ModifiedFile::TYPE_FOLDER ), - $process->wait() + async(fn () => $process->wait())->await(), ); $process->stop(); @@ -115,7 +116,7 @@ public function testRemoval(): void $this->workspace()->path('foobar'), ModifiedFile::TYPE_FILE ), - $process->wait() + async(fn () => $process->wait())->await(), ); $process->stop(); @@ -140,7 +141,7 @@ public function testMultiplePaths(): void $files = []; for ($i = 0; $i < 2; $i++) { - $file = $process->wait(); + $file = async(fn () => $process->wait())->await(); $files[$file->path()] = $file; } From 3a1672f3ff40418ec051e6811348f12bfadd133d Mon Sep 17 00:00:00 2001 From: przepompownia Date: Mon, 23 Feb 2026 18:26:28 +0100 Subject: [PATCH 3/4] bin/watch: allow to watch more than one path --- bin/watch | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bin/watch b/bin/watch index 3bd788c..98a860b 100755 --- a/bin/watch +++ b/bin/watch @@ -17,11 +17,13 @@ require __DIR__ . '/../vendor/autoload.php'; echo "This is a demo application\n"; if (!isset($argv[1])) { - echo 'You must specify a path to watch'; + echo 'You must specify at least one path to watch'; exit(1); } -$path = $argv[1]; +array_shift($argv); + +$paths = $argv; $logger = new class extends AbstractLogger { public function log($level, $message, array $context = []): void { @@ -30,7 +32,7 @@ $logger = new class extends AbstractLogger { }; -$config = new WatcherConfig([$path]); +$config = new WatcherConfig($paths); $watcher = new PatternMatchingWatcher(new FallbackWatcher([ new InotifyWatcher($config, $logger), new FindWatcher($config, $logger), From f595d7a945958d5882bb8935034be9dd3302fd38 Mon Sep 17 00:00:00 2001 From: przepompownia Date: Mon, 23 Feb 2026 18:32:10 +0100 Subject: [PATCH 4/4] FindWatcher: parallelize searching in config paths --- src/Watcher/Find/FindWatcher.php | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Watcher/Find/FindWatcher.php b/src/Watcher/Find/FindWatcher.php index 747126f..8f0d8c8 100644 --- a/src/Watcher/Find/FindWatcher.php +++ b/src/Watcher/Find/FindWatcher.php @@ -16,6 +16,7 @@ use RuntimeException; use function Amp\ByteStream\splitLines; +use function Amp\Future\await; use function Amp\async; use function Amp\delay; @@ -56,13 +57,14 @@ public function watch(): WatcherProcess $this->updateDateReference(); $this->running = true; - async(function (): void { - delay(.01); + delay(.01); + async(function (): void { while ($this->running) { - foreach ($this->config->paths() as $path) { - $this->search($path); - } + await(array_map( + fn (string $path) => async(fn () => $this->search($path)), + $this->config->paths(), + )); $this->updateDateReference(); delay($this->config->pollInterval() / 1000); }