From 1d9ecd9c5356db2f65f1f4d9062b528e19ec697c Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sun, 1 Feb 2026 21:51:19 +0500 Subject: [PATCH] Divide queue and channel names --- config/di.php | 6 +- config/params.php | 4 +- src/Adapter/SynchronousAdapter.php | 8 +- src/ChannelNormalizer.php | 18 --- src/Command/ListenAllCommand.php | 20 +-- src/Command/ListenCommand.php | 8 +- src/Command/RunCommand.php | 20 +-- src/Debug/QueueCollector.php | 10 +- src/Debug/QueueDecorator.php | 11 +- src/Debug/QueueProviderInterfaceProxy.php | 9 +- .../FailureMiddlewareDispatcher.php | 15 ++- src/Provider/AdapterFactoryQueueProvider.php | 38 +++--- src/Provider/ChannelNotFoundException.php | 27 ---- src/Provider/CompositeQueueProvider.php | 12 +- src/Provider/PrototypeQueueProvider.php | 10 +- src/Provider/QueueNotFoundException.php | 27 ++++ src/Provider/QueueProviderInterface.php | 18 +-- src/Queue.php | 13 +- src/QueueInterface.php | 14 ++- src/QueueNameNormalizer.php | 18 +++ stubs/StubAdapter.php | 8 +- stubs/StubQueue.php | 19 +-- tests/App/DummyQueue.php | 9 +- tests/App/FakeAdapter.php | 4 +- tests/Integration/MiddlewareTest.php | 3 +- tests/Unit/Adapter/SynchronousAdapterTest.php | 2 +- tests/Unit/Command/ListenCommandTest.php | 53 ++++++-- tests/Unit/Command/RunCommandTest.php | 116 ++++++++++++++++-- tests/Unit/Debug/QueueDecoratorTest.php | 6 +- .../AdapterFactoryQueueProviderTest.php | 11 +- .../Provider/CompositeQueueProviderTest.php | 8 +- .../Provider/PrototypeQueueProviderTest.php | 2 +- ...est.php => QueueNotFoundExceptionTest.php} | 6 +- tests/Unit/QueueTest.php | 2 +- tests/Unit/WorkerTest.php | 4 +- 35 files changed, 356 insertions(+), 203 deletions(-) delete mode 100644 src/ChannelNormalizer.php delete mode 100644 src/Provider/ChannelNotFoundException.php create mode 100644 src/Provider/QueueNotFoundException.php create mode 100644 src/QueueNameNormalizer.php rename tests/Unit/Provider/{ChannelNotFoundExceptionTest.php => QueueNotFoundExceptionTest.php} (78%) diff --git a/config/di.php b/config/di.php index a2fb3c01..273be5d1 100644 --- a/config/di.php +++ b/config/di.php @@ -31,7 +31,7 @@ return [ AdapterFactoryQueueProvider::class => [ '__construct()' => [ - 'definitions' => $params['yiisoft/queue']['channels'], + 'definitions' => $params['yiisoft/queue']['queues'], ], ], QueueProviderInterface::class => AdapterFactoryQueueProvider::class, @@ -61,12 +61,12 @@ MessageSerializerInterface::class => JsonMessageSerializer::class, RunCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channels']), + 'queues' => array_keys($params['yiisoft/queue']['queues']), ], ], ListenAllCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channels']), + 'queues' => array_keys($params['yiisoft/queue']['queues']), ], ], ]; diff --git a/config/params.php b/config/params.php index 2195a8cb..90b8492f 100644 --- a/config/params.php +++ b/config/params.php @@ -22,8 +22,8 @@ ], 'yiisoft/queue' => [ 'handlers' => [], - 'channels' => [ - QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class, + 'queues' => [ + QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class, ], 'middlewares-push' => [], 'middlewares-consume' => [], diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index 5c8f7743..0e503e6a 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -6,7 +6,7 @@ use BackedEnum; use InvalidArgumentException; -use Yiisoft\Queue\ChannelNormalizer; +use Yiisoft\Queue\QueueNameNormalizer; use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Provider\QueueProviderInterface; @@ -25,9 +25,9 @@ final class SynchronousAdapter implements AdapterInterface public function __construct( private readonly WorkerInterface $worker, private readonly QueueInterface $queue, - string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL, + string|BackedEnum $channel = QueueProviderInterface::DEFAULT_QUEUE, ) { - $this->channel = ChannelNormalizer::normalize($channel); + $this->channel = QueueNameNormalizer::normalize($channel); } public function __destruct() @@ -83,7 +83,7 @@ public function subscribe(callable $handlerCallback): void public function withChannel(string|BackedEnum $channel): self { - $channel = ChannelNormalizer::normalize($channel); + $channel = QueueNameNormalizer::normalize($channel); if ($channel === $this->channel) { return $this; diff --git a/src/ChannelNormalizer.php b/src/ChannelNormalizer.php deleted file mode 100644 index 1b178188..00000000 --- a/src/ChannelNormalizer.php +++ /dev/null @@ -1,18 +0,0 @@ -value : $channel; - } -} diff --git a/src/Command/ListenAllCommand.php b/src/Command/ListenAllCommand.php index 8d61a4b6..66f88f7c 100644 --- a/src/Command/ListenAllCommand.php +++ b/src/Command/ListenAllCommand.php @@ -25,7 +25,7 @@ final class ListenAllCommand extends Command public function __construct( private readonly QueueProviderInterface $queueProvider, private readonly LoopInterface $loop, - private readonly array $channels, + private readonly array $queues, ) { parent::__construct(); } @@ -36,36 +36,36 @@ public function __construct( public function configure(): void { $this->addArgument( - 'channel', + 'queue', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, - 'Queue channel name list to connect to', - $this->channels, + 'Queue name list to connect to', + $this->queues, ) ->addOption( 'pause', 'p', InputOption::VALUE_REQUIRED, - 'Pause between queue channel iterations in seconds. May save some CPU. Default: 1', + 'Pause between queue iterations in seconds. May save some CPU. Default: 1', 1, ) ->addOption( 'maximum', 'm', InputOption::VALUE_REQUIRED, - 'Maximum number of messages to process in each channel before switching to another channel. ' + 'Maximum number of messages to process in each queue before switching to another queue. ' . 'Default is 0 (no limits).', 0, ); - $this->addUsage('[channel1 [channel2 [...]]] [--timeout=] [--maximum=]'); + $this->addUsage('[queue1 [queue2 [...]]] [--timeout=] [--maximum=]'); } protected function execute(InputInterface $input, OutputInterface $output): int { $queues = []; - /** @var string $channel */ - foreach ($input->getArgument('channel') as $channel) { - $queues[] = $this->queueProvider->get($channel); + /** @var string $queue */ + foreach ($input->getArgument('queue') as $queue) { + $queues[] = $this->queueProvider->get($queue); } $pauseSeconds = (int) $input->getOption('pause'); diff --git a/src/Command/ListenCommand.php b/src/Command/ListenCommand.php index 7b2859dc..24b778ab 100644 --- a/src/Command/ListenCommand.php +++ b/src/Command/ListenCommand.php @@ -26,17 +26,17 @@ public function __construct( public function configure(): void { $this->addArgument( - 'channel', + 'queue', InputArgument::OPTIONAL, - 'Queue channel name to connect to', - QueueProviderInterface::DEFAULT_CHANNEL, + 'Queue name to connect to', + QueueProviderInterface::DEFAULT_QUEUE, ); } protected function execute(InputInterface $input, OutputInterface $output): int { $this->queueProvider - ->get($input->getArgument('channel')) + ->get($input->getArgument('queue')) ->listen(); return 0; diff --git a/src/Command/RunCommand.php b/src/Command/RunCommand.php index 377a5dc0..7f1aeab7 100644 --- a/src/Command/RunCommand.php +++ b/src/Command/RunCommand.php @@ -20,7 +20,7 @@ final class RunCommand extends Command { public function __construct( private readonly QueueProviderInterface $queueProvider, - private readonly array $channels, + private readonly array $queues, ) { parent::__construct(); } @@ -28,28 +28,28 @@ public function __construct( public function configure(): void { $this->addArgument( - 'channel', + 'queue', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, - 'Queue channel name list to connect to.', - $this->channels, + 'Queue name list to connect to.', + $this->queues, ) ->addOption( 'maximum', 'm', InputOption::VALUE_REQUIRED, - 'Maximum number of messages to process in each channel. Default is 0 (no limits).', + 'Maximum number of messages to process in each queue. Default is 0 (no limits).', 0, ) - ->addUsage('[channel1 [channel2 [...]]] --maximum 100'); + ->addUsage('[queue1 [queue2 [...]]] --maximum 100'); } protected function execute(InputInterface $input, OutputInterface $output): int { - /** @var string $channel */ - foreach ($input->getArgument('channel') as $channel) { - $output->write("Processing channel $channel... "); + /** @var string $queue */ + foreach ($input->getArgument('queue') as $queue) { + $output->write("Processing queue $queue... "); $count = $this->queueProvider - ->get($channel) + ->get($queue) ->run((int) $input->getOption('maximum')); $output->writeln("Messages processed: $count."); diff --git a/src/Debug/QueueCollector.php b/src/Debug/QueueCollector.php index d0950178..0f48c230 100644 --- a/src/Debug/QueueCollector.php +++ b/src/Debug/QueueCollector.php @@ -47,18 +47,18 @@ public function collectStatus(string $id, JobStatus $status): void } public function collectPush( - ?string $channel, + ?string $queueName, MessageInterface $message, string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions, ): void { if (!$this->isActive()) { return; } - if ($channel === null) { - $channel = 'null'; + if ($queueName === null) { + $queueName = 'null'; } - $this->pushes[$channel][] = [ + $this->pushes[$queueName][] = [ 'message' => $message, 'middlewares' => $middlewareDefinitions, ]; @@ -69,7 +69,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac if (!$this->isActive()) { return; } - $this->processingMessages[$queue->getChannel()][] = $message; + $this->processingMessages[$queue->getName()][] = $message; } public function getSummary(): array diff --git a/src/Debug/QueueDecorator.php b/src/Debug/QueueDecorator.php index 5e177250..f0bc7353 100644 --- a/src/Debug/QueueDecorator.php +++ b/src/Debug/QueueDecorator.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue\Debug; +use BackedEnum; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\MessageInterface; @@ -30,7 +31,7 @@ public function push( string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions, ): MessageInterface { $message = $this->queue->push($message, ...$middlewareDefinitions); - $this->collector->collectPush($this->queue->getChannel(), $message, ...$middlewareDefinitions); + $this->collector->collectPush($this->queue->getName(), $message, ...$middlewareDefinitions); return $message; } @@ -44,13 +45,13 @@ public function listen(): void $this->queue->listen(); } - public function withAdapter(AdapterInterface $adapter): static + public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static { - return new self($this->queue->withAdapter($adapter), $this->collector); + return new self($this->queue->withAdapter($adapter, $queueName), $this->collector); } - public function getChannel(): string + public function getName(): string { - return $this->queue->getChannel(); + return $this->queue->getName(); } } diff --git a/src/Debug/QueueProviderInterfaceProxy.php b/src/Debug/QueueProviderInterfaceProxy.php index a1ec5e6e..e01d3775 100644 --- a/src/Debug/QueueProviderInterfaceProxy.php +++ b/src/Debug/QueueProviderInterfaceProxy.php @@ -15,14 +15,15 @@ public function __construct( private readonly QueueCollector $collector, ) {} - public function get(string|BackedEnum $channel): QueueInterface + public function get(string|BackedEnum $queue): QueueInterface { - $queue = $this->queueProvider->get($channel); + $queue = $this->queueProvider->get($queue); + return new QueueDecorator($queue, $this->collector); } - public function has(string|BackedEnum $channel): bool + public function has(string|BackedEnum $queue): bool { - return $this->queueProvider->has($channel); + return $this->queueProvider->has($queue); } } diff --git a/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php b/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php index c5c23b68..226aaca5 100644 --- a/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php +++ b/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php @@ -37,18 +37,17 @@ public function dispatch( FailureHandlingRequest $request, MessageFailureHandlerInterface $finishHandler, ): FailureHandlingRequest { - /** @var string $channel It is always string in this context */ - $channel = $request->getQueue()->getChannel(); - if (!isset($this->middlewareDefinitions[$channel]) || $this->middlewareDefinitions[$channel] === []) { - $channel = self::DEFAULT_PIPELINE; + $queueName = $request->getQueue()->getName(); + if (!isset($this->middlewareDefinitions[$queueName]) || $this->middlewareDefinitions[$queueName] === []) { + $queueName = self::DEFAULT_PIPELINE; } - $definitions = array_reverse($this->middlewareDefinitions[$channel]); + $definitions = array_reverse($this->middlewareDefinitions[$queueName]); - if (!isset($this->stack[$channel])) { - $this->stack[$channel] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler); + if (!isset($this->stack[$queueName])) { + $this->stack[$queueName] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler); } - return $this->stack[$channel]->handleFailure($request); + return $this->stack[$queueName]->handleFailure($request); } /** diff --git a/src/Provider/AdapterFactoryQueueProvider.php b/src/Provider/AdapterFactoryQueueProvider.php index 1e52875d..91abe60b 100644 --- a/src/Provider/AdapterFactoryQueueProvider.php +++ b/src/Provider/AdapterFactoryQueueProvider.php @@ -9,7 +9,7 @@ use Yiisoft\Definitions\Exception\InvalidConfigException; use Yiisoft\Factory\StrictFactory; use Yiisoft\Queue\Adapter\AdapterInterface; -use Yiisoft\Queue\ChannelNormalizer; +use Yiisoft\Queue\QueueNameNormalizer; use Yiisoft\Queue\QueueInterface; use function array_key_exists; @@ -32,7 +32,7 @@ final class AdapterFactoryQueueProvider implements QueueProviderInterface /** * @param QueueInterface $baseQueue Base queue for queues creation. - * @param array $definitions Adapter definitions indexed by channel names. + * @param array $definitions Adapter definitions indexed by queueName names. * @param ContainerInterface|null $container Container to use for dependencies resolving. * @param bool $validate If definitions should be validated when set. * @@ -52,50 +52,50 @@ public function __construct( } } - public function get(string|BackedEnum $channel): QueueInterface + public function get(string|BackedEnum $queueName): QueueInterface { - $channel = ChannelNormalizer::normalize($channel); + $queueName = QueueNameNormalizer::normalize($queueName); - $queue = $this->getOrTryToCreate($channel); + $queue = $this->getOrTryToCreate($queueName); if ($queue === null) { - throw new ChannelNotFoundException($channel); + throw new QueueNotFoundException($queueName); } return $queue; } - public function has(string|BackedEnum $channel): bool + public function has(string|BackedEnum $queueName): bool { - $channel = ChannelNormalizer::normalize($channel); - return $this->factory->has($channel); + $queueName = QueueNameNormalizer::normalize($queueName); + return $this->factory->has($queueName); } /** * @throws InvalidQueueConfigException */ - private function getOrTryToCreate(string $channel): ?QueueInterface + private function getOrTryToCreate(string $queueName): ?QueueInterface { - if (array_key_exists($channel, $this->queues)) { - return $this->queues[$channel]; + if (array_key_exists($queueName, $this->queues)) { + return $this->queues[$queueName]; } - if ($this->factory->has($channel)) { - $adapter = $this->factory->create($channel); + if ($this->factory->has($queueName)) { + $adapter = $this->factory->create($queueName); if (!$adapter instanceof AdapterInterface) { throw new InvalidQueueConfigException( sprintf( - 'Adapter must implement "%s". For channel "%s" got "%s" instead.', + 'Adapter must implement "%s". For queueName "%s" got "%s" instead.', AdapterInterface::class, - $channel, + $queueName, get_debug_type($adapter), ), ); } - $this->queues[$channel] = $this->baseQueue->withAdapter($adapter->withChannel($channel)); + $this->queues[$queueName] = $this->baseQueue->withAdapter($adapter, $queueName); } else { - $this->queues[$channel] = null; + $this->queues[$queueName] = null; } - return $this->queues[$channel]; + return $this->queues[$queueName]; } } diff --git a/src/Provider/ChannelNotFoundException.php b/src/Provider/ChannelNotFoundException.php deleted file mode 100644 index ccd43bd3..00000000 --- a/src/Provider/ChannelNotFoundException.php +++ /dev/null @@ -1,27 +0,0 @@ -providers = $providers; } - public function get(string|BackedEnum $channel): QueueInterface + public function get(string|BackedEnum $queueName): QueueInterface { foreach ($this->providers as $provider) { - if ($provider->has($channel)) { - return $provider->get($channel); + if ($provider->has($queueName)) { + return $provider->get($queueName); } } - throw new ChannelNotFoundException($channel); + throw new QueueNotFoundException($queueName); } - public function has(string|BackedEnum $channel): bool + public function has(string|BackedEnum $queueName): bool { foreach ($this->providers as $provider) { - if ($provider->has($channel)) { + if ($provider->has($queueName)) { return true; } } diff --git a/src/Provider/PrototypeQueueProvider.php b/src/Provider/PrototypeQueueProvider.php index 84cdcf7d..0d48e02e 100644 --- a/src/Provider/PrototypeQueueProvider.php +++ b/src/Provider/PrototypeQueueProvider.php @@ -9,8 +9,8 @@ use Yiisoft\Queue\QueueInterface; /** - * Queue provider that only changes the channel name of the base queue. - * It can be useful when your queues used the same adapter. + * Queue provider that only changes the channel name of the base adapter and sets queue name to the same value. + * It can be useful when your queues use the same adapter, which only changes the broker channel name. */ final class PrototypeQueueProvider implements QueueProviderInterface { @@ -22,12 +22,12 @@ public function __construct( private readonly AdapterInterface $baseAdapter, ) {} - public function get(string|BackedEnum $channel): QueueInterface + public function get(string|BackedEnum $queueName): QueueInterface { - return $this->baseQueue->withAdapter($this->baseAdapter->withChannel($channel)); + return $this->baseQueue->withAdapter($this->baseAdapter->withChannel($queueName), $queueName); } - public function has(string|BackedEnum $channel): bool + public function has(string|BackedEnum $queueName): bool { return true; } diff --git a/src/Provider/QueueNotFoundException.php b/src/Provider/QueueNotFoundException.php new file mode 100644 index 00000000..bc302d40 --- /dev/null +++ b/src/Provider/QueueNotFoundException.php @@ -0,0 +1,27 @@ +adapterPushHandler = new AdapterPushHandler(); } - public function getChannel(): string + public function getName(): string { - $this->checkAdapter(); - return $this->adapter->getChannel(); + return $this->name; } public function push( @@ -109,10 +111,13 @@ public function status(string|int $id): JobStatus return $this->adapter->status($id); } - public function withAdapter(AdapterInterface $adapter): static + public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static { $new = clone $this; $new->adapter = $adapter; + if ($queueName !== null) { + $new->name = QueueNameNormalizer::normalize($queueName); + } return $new; } diff --git a/src/QueueInterface.php b/src/QueueInterface.php index 1985c32e..3722e00f 100644 --- a/src/QueueInterface.php +++ b/src/QueueInterface.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue; +use BackedEnum; use InvalidArgumentException; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Message\MessageInterface; @@ -40,7 +41,16 @@ public function listen(): void; */ public function status(string|int $id): JobStatus; - public function withAdapter(AdapterInterface $adapter): static; + /** + * @param AdapterInterface $adapter Adapter to use. + * @param string|null $queueName Queue name to use. + * + * @return static A new queue with the given adapter and queue name. + */ + public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static; - public function getChannel(): string; + /** + * Returns the logical name of the queue. + */ + public function getName(): string; } diff --git a/src/QueueNameNormalizer.php b/src/QueueNameNormalizer.php new file mode 100644 index 00000000..c5406bd0 --- /dev/null +++ b/src/QueueNameNormalizer.php @@ -0,0 +1,18 @@ +value : $queueName; + } +} diff --git a/stubs/StubAdapter.php b/stubs/StubAdapter.php index e7332e2d..e0f6270f 100644 --- a/stubs/StubAdapter.php +++ b/stubs/StubAdapter.php @@ -6,7 +6,7 @@ use BackedEnum; use Yiisoft\Queue\Adapter\AdapterInterface; -use Yiisoft\Queue\ChannelNormalizer; +use Yiisoft\Queue\QueueNameNormalizer; use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Provider\QueueProviderInterface; @@ -19,9 +19,9 @@ final class StubAdapter implements AdapterInterface private string $channel; public function __construct( - string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL + string|BackedEnum $channel = QueueProviderInterface::DEFAULT_QUEUE ) { - $this->channel = ChannelNormalizer::normalize($channel); + $this->channel = QueueNameNormalizer::normalize($channel); } public function runExisting(callable $handlerCallback): void @@ -45,7 +45,7 @@ public function subscribe(callable $handlerCallback): void public function withChannel(string|BackedEnum $channel): AdapterInterface { $new = clone $this; - $new->channel = ChannelNormalizer::normalize($channel); + $new->channel = QueueNameNormalizer::normalize($channel); return $new; } diff --git a/stubs/StubQueue.php b/stubs/StubQueue.php index 4d2ed10a..3532adca 100644 --- a/stubs/StubQueue.php +++ b/stubs/StubQueue.php @@ -4,18 +4,21 @@ namespace Yiisoft\Queue\Stubs; -use LogicException; +use BackedEnum; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; use Yiisoft\Queue\QueueInterface; +use Yiisoft\Queue\QueueNameNormalizer; /** * Stub queue that does nothing. Job status is always "done". */ final class StubQueue implements QueueInterface { + private string $name = 'default'; + public function __construct(private ?AdapterInterface $adapter = null) { } @@ -46,20 +49,20 @@ public function getAdapter(): ?AdapterInterface return $this->adapter; } - public function withAdapter(AdapterInterface $adapter): static + public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static { $new = clone $this; $new->adapter = $adapter; + if ($queueName !== null) { + $new->name = QueueNameNormalizer::normalize($queueName); + } + return $new; } - public function getChannel(): string + public function getName(): string { - if ($this->adapter === null) { - throw new LogicException('Adapter is not set.'); - } - - return $this->adapter->getChannel(); + return $this->name; } } diff --git a/tests/App/DummyQueue.php b/tests/App/DummyQueue.php index 9286ab64..f15ee673 100644 --- a/tests/App/DummyQueue.php +++ b/tests/App/DummyQueue.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue\Tests\App; +use BackedEnum; use Exception; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\JobStatus; @@ -14,7 +15,7 @@ final class DummyQueue implements QueueInterface { public function __construct( - private readonly string $channel, + private readonly string $name, ) {} public function push( @@ -36,13 +37,13 @@ public function status(string|int $id): JobStatus throw new Exception('`status()` method is not implemented yet.'); } - public function withAdapter(AdapterInterface $adapter): static + public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static { throw new Exception('`withAdapter()` method is not implemented yet.'); } - public function getChannel(): string + public function getName(): string { - return $this->channel; + return $this->name; } } diff --git a/tests/App/FakeAdapter.php b/tests/App/FakeAdapter.php index db5cb39d..f4529854 100644 --- a/tests/App/FakeAdapter.php +++ b/tests/App/FakeAdapter.php @@ -6,7 +6,7 @@ use BackedEnum; use Yiisoft\Queue\Adapter\AdapterInterface; -use Yiisoft\Queue\ChannelNormalizer; +use Yiisoft\Queue\QueueNameNormalizer; use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\MessageInterface; @@ -41,7 +41,7 @@ public function withChannel(string|BackedEnum $channel): AdapterInterface { $instance = clone $this; $instance->pushMessages = []; - $instance->channel = ChannelNormalizer::normalize($channel); + $instance->channel = QueueNameNormalizer::normalize($channel); return $instance; } diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 57b62d62..02e02ddf 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -63,6 +63,7 @@ public function testFullStackPush(): void $this->createMock(LoopInterface::class), $this->createMock(LoggerInterface::class), $pushMiddlewareDispatcher, + 'test', new SynchronousAdapter( $this->createMock(WorkerInterface::class), $this->createMock(QueueInterface::class), @@ -136,7 +137,7 @@ public function testFullStackFailure(): void $callableFactory = new CallableFactory($container); $queue->expects(self::exactly(7))->method('push')->willReturnCallback($queueCallback); - $queue->method('getChannel')->willReturn('simple'); + $queue->method('getName')->willReturn('simple'); $middlewares = [ 'simple' => [ diff --git a/tests/Unit/Adapter/SynchronousAdapterTest.php b/tests/Unit/Adapter/SynchronousAdapterTest.php index f05ac078..c5ffcd84 100644 --- a/tests/Unit/Adapter/SynchronousAdapterTest.php +++ b/tests/Unit/Adapter/SynchronousAdapterTest.php @@ -53,7 +53,7 @@ public function testIdSetting(): void public function testWithSameChannel(): void { $adapter = $this->getAdapter(); - self::assertEquals($adapter, $adapter->withChannel(QueueProviderInterface::DEFAULT_CHANNEL)); + self::assertEquals($adapter, $adapter->withChannel(QueueProviderInterface::DEFAULT_QUEUE)); } public function testWithAnotherChannel(): void diff --git a/tests/Unit/Command/ListenCommandTest.php b/tests/Unit/Command/ListenCommandTest.php index 2e2971ea..46dda71f 100644 --- a/tests/Unit/Command/ListenCommandTest.php +++ b/tests/Unit/Command/ListenCommandTest.php @@ -13,24 +13,57 @@ final class ListenCommandTest extends TestCase { - public function testConfigure(): void + public function testExecuteWithDefaultQueue(): void { - $command = new ListenCommand($this->createMock(QueueProviderInterface::class)); - $channelArgument = $command->getNativeDefinition()->getArgument('channel'); - $this->assertEquals('channel', $channelArgument->getName()); + $queue = $this->createMock(QueueInterface::class); + $queue->expects($this->once()) + ->method('listen'); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->expects($this->once()) + ->method('get') + ->with($this->equalTo('yii-queue')) + ->willReturn($queue); + + $input = new StringInput(''); + $command = new ListenCommand($queueProvider); + $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); + + $this->assertEquals(0, $exitCode); } - public function testExecute(): void + public function testExecuteWithCustomQueue(): void { $queue = $this->createMock(QueueInterface::class); - $queue->expects($this->once())->method('listen'); - $queueFactory = $this->createMock(QueueProviderInterface::class); - $queueFactory->method('get')->willReturn($queue); - $input = new StringInput('channel'); + $queue->expects($this->once()) + ->method('listen'); - $command = new ListenCommand($queueFactory); + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->expects($this->once()) + ->method('get') + ->with($this->equalTo('custom-queue')) + ->willReturn($queue); + + $input = new StringInput('custom-queue'); + $command = new ListenCommand($queueProvider); $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); $this->assertEquals(0, $exitCode); } + + public function testExecuteReturnsZero(): void + { + $queue = $this->createMock(QueueInterface::class); + $queue->expects($this->once()) + ->method('listen'); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('get')->willReturn($queue); + + $input = new StringInput(''); + $command = new ListenCommand($queueProvider); + $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); + + $this->assertSame(0, $exitCode); + } } diff --git a/tests/Unit/Command/RunCommandTest.php b/tests/Unit/Command/RunCommandTest.php index 3e38305e..672db54e 100644 --- a/tests/Unit/Command/RunCommandTest.php +++ b/tests/Unit/Command/RunCommandTest.php @@ -13,23 +13,121 @@ final class RunCommandTest extends TestCase { - public function testConfigure(): void + public function testExecuteWithSingleQueue(): void { - $command = new RunCommand($this->createMock(QueueProviderInterface::class), []); - $channelArgument = $command->getNativeDefinition()->getArgument('channel'); - $this->assertEquals('channel', $channelArgument->getName()); + $queue = $this->createMock(QueueInterface::class); + $queue->expects($this->once()) + ->method('run') + ->with($this->equalTo(0)) + ->willReturn(5); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->expects($this->once()) + ->method('get') + ->with($this->equalTo('test-queue')) + ->willReturn($queue); + + $input = new StringInput('test-queue'); + $output = $this->createMock(OutputInterface::class); + $output->expects($this->once()) + ->method('write') + ->with($this->equalTo('Processing queue test-queue... ')); + $output->expects($this->once()) + ->method('writeln') + ->with($this->equalTo('Messages processed: 5.')); + + $command = new RunCommand($queueProvider, []); + $exitCode = $command->run($input, $output); + + $this->assertEquals(0, $exitCode); } - public function testExecute(): void + public function testExecuteWithMultipleQueues(): void + { + $queue1 = $this->createMock(QueueInterface::class); + $queue1->expects($this->once()) + ->method('run') + ->with($this->equalTo(0)) + ->willReturn(3); + + $queue2 = $this->createMock(QueueInterface::class); + $queue2->expects($this->once()) + ->method('run') + ->with($this->equalTo(0)) + ->willReturn(7); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->expects($this->exactly(2)) + ->method('get') + ->willReturnOnConsecutiveCalls($queue1, $queue2); + + $output = $this->createMock(OutputInterface::class); + $output->expects($this->exactly(2)) + ->method('write'); + $output->expects($this->exactly(2)) + ->method('writeln'); + + $input = new StringInput('queue1 queue2'); + $command = new RunCommand($queueProvider, []); + $exitCode = $command->run($input, $output); + + $this->assertEquals(0, $exitCode); + } + + public function testExecuteWithMaximumOption(): void { $queue = $this->createMock(QueueInterface::class); - $queue->expects($this->once())->method('run'); + $queue->expects($this->once()) + ->method('run') + ->with($this->equalTo(100)) + ->willReturn(10); + $queueProvider = $this->createMock(QueueProviderInterface::class); - $queueProvider->method('get')->willReturn($queue); - $input = new StringInput('channel'); + $queueProvider->expects($this->once()) + ->method('get') + ->with($this->equalTo('test-queue')) + ->willReturn($queue); + + $input = new StringInput('test-queue --maximum=100'); + $output = $this->createMock(OutputInterface::class); + $output->expects($this->once()) + ->method('write') + ->with($this->equalTo('Processing queue test-queue... ')); + $output->expects($this->once()) + ->method('writeln') + ->with($this->equalTo('Messages processed: 10.')); $command = new RunCommand($queueProvider, []); - $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); + $exitCode = $command->run($input, $output); + + $this->assertEquals(0, $exitCode); + } + + public function testExecuteWithDefaultQueues(): void + { + $queue = $this->createMock(QueueInterface::class); + $queue->expects($this->once()) + ->method('run') + ->with($this->equalTo(0)) + ->willReturn(2); + + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->expects($this->once()) + ->method('get') + ->with($this->equalTo('default-queue')) + ->willReturn($queue); + + $input = new StringInput(''); + $output = $this->createMock(OutputInterface::class); + $output->expects($this->once()) + ->method('write') + ->with($this->equalTo('Processing queue default-queue... ')); + $output->expects($this->once()) + ->method('writeln') + ->with($this->equalTo('Messages processed: 2.')); + + $command = new RunCommand($queueProvider, ['default-queue']); + $exitCode = $command->run($input, $output); $this->assertEquals(0, $exitCode); } diff --git a/tests/Unit/Debug/QueueDecoratorTest.php b/tests/Unit/Debug/QueueDecoratorTest.php index 50ec6eef..8d10822b 100644 --- a/tests/Unit/Debug/QueueDecoratorTest.php +++ b/tests/Unit/Debug/QueueDecoratorTest.php @@ -86,17 +86,17 @@ public function testListen(): void $decorator->listen(); } - public function testGetChannel(): void + public function testGetName(): void { $queue = $this->createMock(QueueInterface::class); - $queue->expects($this->once())->method('getChannel')->willReturn('hello'); + $queue->expects($this->once())->method('getName')->willReturn('hello'); $collector = new QueueCollector(); $decorator = new QueueDecorator( $queue, $collector, ); - $this->assertEquals('hello', $decorator->getChannel()); + $this->assertEquals('hello', $decorator->getName()); } public function testImmutable(): void diff --git a/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php index 50044f49..d00360cc 100644 --- a/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php +++ b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php @@ -8,7 +8,7 @@ use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Stubs\StubLoop; use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; -use Yiisoft\Queue\Provider\ChannelNotFoundException; +use Yiisoft\Queue\Provider\QueueNotFoundException; use Yiisoft\Queue\Provider\InvalidQueueConfigException; use Yiisoft\Queue\Stubs\StubQueue; use Yiisoft\Queue\Stubs\StubAdapter; @@ -27,10 +27,11 @@ public function testBase(): void ], ); + /** @var StubQueue $queue */ $queue = $provider->get('channel1'); $this->assertInstanceOf(StubQueue::class, $queue); - $this->assertSame('channel1', $queue->getChannel()); + $this->assertSame('channel1', $queue->getName()); $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); $this->assertTrue($provider->has('channel1')); $this->assertFalse($provider->has('not-exist-channel')); @@ -60,8 +61,8 @@ public function testGetNotExistChannel(): void ], ); - $this->expectException(ChannelNotFoundException::class); - $this->expectExceptionMessage('Channel "not-exist-channel" not found.'); + $this->expectException(QueueNotFoundException::class); + $this->expectExceptionMessage('Queue with name "not-exist-channel" not found.'); $provider->get('not-exist-channel'); } @@ -113,7 +114,7 @@ public function testGetHasByStringEnum(): void $queue = $provider->get(StringEnum::RED); - $this->assertSame('red', $queue->getChannel()); + $this->assertSame('red', $queue->getName()); $this->assertTrue($provider->has(StringEnum::RED)); $this->assertFalse($provider->has(StringEnum::GREEN)); } diff --git a/tests/Unit/Provider/CompositeQueueProviderTest.php b/tests/Unit/Provider/CompositeQueueProviderTest.php index 2c2e5c39..9c28931f 100644 --- a/tests/Unit/Provider/CompositeQueueProviderTest.php +++ b/tests/Unit/Provider/CompositeQueueProviderTest.php @@ -5,7 +5,7 @@ namespace Yiisoft\Queue\Tests\Unit\Provider; use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; -use Yiisoft\Queue\Provider\ChannelNotFoundException; +use Yiisoft\Queue\Provider\QueueNotFoundException; use Yiisoft\Queue\Provider\CompositeQueueProvider; use Yiisoft\Queue\Stubs\StubAdapter; use Yiisoft\Queue\Stubs\StubQueue; @@ -31,8 +31,8 @@ public function testBase(): void $this->assertTrue($provider->has('channel2')); $this->assertFalse($provider->has('channel3')); - $this->assertSame('channel1', $provider->get('channel1')->getChannel()); - $this->assertSame('channel2', $provider->get('channel2')->getChannel()); + $this->assertSame('channel1', $provider->get('channel1')->getName()); + $this->assertSame('channel2', $provider->get('channel2')->getName()); } public function testNotFound(): void @@ -44,7 +44,7 @@ public function testNotFound(): void ), ); - $this->expectException(ChannelNotFoundException::class); + $this->expectException(QueueNotFoundException::class); $this->expectExceptionMessage('Channel "not-exists" not found.'); $provider->get('not-exists'); } diff --git a/tests/Unit/Provider/PrototypeQueueProviderTest.php b/tests/Unit/Provider/PrototypeQueueProviderTest.php index 3429ee19..70816d3e 100644 --- a/tests/Unit/Provider/PrototypeQueueProviderTest.php +++ b/tests/Unit/Provider/PrototypeQueueProviderTest.php @@ -21,7 +21,7 @@ public function testBase(): void $queue = $provider->get('test-channel'); $this->assertInstanceOf(StubQueue::class, $queue); - $this->assertSame('test-channel', $queue->getChannel()); + $this->assertSame('test-channel', $queue->getName()); $this->assertTrue($provider->has('test-channel')); $this->assertTrue($provider->has('yet-another-channel')); } diff --git a/tests/Unit/Provider/ChannelNotFoundExceptionTest.php b/tests/Unit/Provider/QueueNotFoundExceptionTest.php similarity index 78% rename from tests/Unit/Provider/ChannelNotFoundExceptionTest.php rename to tests/Unit/Provider/QueueNotFoundExceptionTest.php index c7bf0bed..e047fb0e 100644 --- a/tests/Unit/Provider/ChannelNotFoundExceptionTest.php +++ b/tests/Unit/Provider/QueueNotFoundExceptionTest.php @@ -5,11 +5,11 @@ namespace Yiisoft\Queue\Tests\Unit\Provider; use PHPUnit\Framework\Attributes\DataProvider; -use Yiisoft\Queue\Provider\ChannelNotFoundException; +use Yiisoft\Queue\Provider\QueueNotFoundException; use Yiisoft\Queue\Tests\TestCase; use Yiisoft\Queue\Tests\Unit\Support\StringEnum; -final class ChannelNotFoundExceptionTest extends TestCase +final class QueueNotFoundExceptionTest extends TestCase { public static function dataBase(): iterable { @@ -20,7 +20,7 @@ public static function dataBase(): iterable #[DataProvider('dataBase')] public function testBase(string $expectedChannel, mixed $channel): void { - $exception = new ChannelNotFoundException($channel); + $exception = new QueueNotFoundException($channel); $this->assertSame( 'Channel "' . $expectedChannel . '" not found.', diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index a3c5cc1d..603b6013 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -152,7 +152,7 @@ public function testGetChannel(): void ->getQueue() ->withAdapter(new StubAdapter('test-channel')); - $this->assertSame('test-channel', $queue->getChannel()); + $this->assertSame('test-channel', $queue->getName()); } public function testGetChannelWithoutAdapter(): void diff --git a/tests/Unit/WorkerTest.php b/tests/Unit/WorkerTest.php index 5febf399..eb960c47 100644 --- a/tests/Unit/WorkerTest.php +++ b/tests/Unit/WorkerTest.php @@ -201,7 +201,7 @@ public function testJobFailureIsHandledSuccessfully(): void $message = new Message('simple', null); /** @var MockObject&QueueInterface $queue */ $queue = $this->createMock(QueueInterface::class); - $queue->method('getChannel')->willReturn('test-channel'); + $queue->method('getName')->willReturn('test-queue'); $originalException = new RuntimeException('Consume failed'); /** @var MiddlewareConsumeInterface&MockObject $consumeMiddleware */ @@ -221,7 +221,7 @@ public function testJobFailureIsHandledSuccessfully(): void /** @var MiddlewareFactoryFailureInterface&MockObject $failureMiddlewareFactory */ $failureMiddlewareFactory = $this->createMock(MiddlewareFactoryFailureInterface::class); $failureMiddlewareFactory->method('createFailureMiddleware')->willReturn($failureMiddleware); - $failureDispatcher = new FailureMiddlewareDispatcher($failureMiddlewareFactory, ['test-channel' => ['simple']]); + $failureDispatcher = new FailureMiddlewareDispatcher($failureMiddlewareFactory, ['test-queue' => ['simple']]); $worker = new Worker( ['simple' => fn() => null],