Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
'definitions' => $params['yiisoft/queue']['queues'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
Expand Down Expand Up @@ -61,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
];
4 changes: 2 additions & 2 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channels' => [
QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class,
'queues' => [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"queues" define adapters, not queues. It's unexpected,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is correct; it's

queue1 => adapter1, channel 1
queue2 => adapter1, channel 2
queue3 => adapter2

QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
8 changes: 4 additions & 4 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\ChannelNormalizer;
use Yiisoft\Queue\QueueNameNormalizer;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
Expand All @@ -25,9 +25,9 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_QUEUE,
) {
$this->channel = ChannelNormalizer::normalize($channel);
$this->channel = QueueNameNormalizer::normalize($channel);
}

public function __destruct()
Expand Down Expand Up @@ -83,7 +83,7 @@ public function subscribe(callable $handlerCallback): void

public function withChannel(string|BackedEnum $channel): self
{
$channel = ChannelNormalizer::normalize($channel);
$channel = QueueNameNormalizer::normalize($channel);

if ($channel === $this->channel) {
return $this;
Expand Down
18 changes: 0 additions & 18 deletions src/ChannelNormalizer.php

This file was deleted.

20 changes: 10 additions & 10 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class ListenAllCommand extends Command
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}
Expand All @@ -36,36 +36,36 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
'Queue name list to connect to',
$this->queues,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
'Pause between queue iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. '
'Maximum number of messages to process in each queue before switching to another queue. '
. 'Default is 0 (no limits).',
0,
);

$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
$this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueProvider->get($channel);
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$queues[] = $this->queueProvider->get($queue);
}

$pauseSeconds = (int) $input->getOption('pause');
Expand Down
8 changes: 4 additions & 4 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueProviderInterface::DEFAULT_CHANNEL,
'Queue name to connect to',
QueueProviderInterface::DEFAULT_QUEUE,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueProvider
->get($input->getArgument('channel'))
->get($input->getArgument('queue'))
->listen();

return 0;
Expand Down
20 changes: 10 additions & 10 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,36 @@ final class RunCommand extends Command
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}

public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to.',
$this->channels,
'Queue name list to connect to.',
$this->queues,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
'Maximum number of messages to process in each queue. Default is 0 (no limits).',
0,
)
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
->addUsage('[queue1 [queue2 [...]]] --maximum 100');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$output->write("Processing queue $queue... ");
$count = $this->queueProvider
->get($channel)
->get($queue)
->run((int) $input->getOption('maximum'));

$output->writeln("Messages processed: $count.");
Expand Down
10 changes: 5 additions & 5 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ public function collectStatus(string $id, JobStatus $status): void
}

public function collectPush(
?string $channel,
?string $queueName,
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): void {
if (!$this->isActive()) {
return;
}
if ($channel === null) {
$channel = 'null';
if ($queueName === null) {
$queueName = 'null';
}

$this->pushes[$channel][] = [
$this->pushes[$queueName][] = [
'message' => $message,
'middlewares' => $middlewareDefinitions,
];
Expand All @@ -69,7 +69,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac
if (!$this->isActive()) {
return;
}
$this->processingMessages[$queue->getChannel()][] = $message;
$this->processingMessages[$queue->getName()][] = $message;
}

public function getSummary(): array
Expand Down
11 changes: 6 additions & 5 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Yiisoft\Queue\Debug;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand All @@ -30,7 +31,7 @@ public function push(
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannel(), $message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getName(), $message, ...$middlewareDefinitions);
return $message;
}

Expand All @@ -44,13 +45,13 @@ public function listen(): void
$this->queue->listen();
}

public function withAdapter(AdapterInterface $adapter): static
public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static
{
return new self($this->queue->withAdapter($adapter), $this->collector);
return new self($this->queue->withAdapter($adapter, $queueName), $this->collector);
}

public function getChannel(): string
public function getName(): string
{
return $this->queue->getChannel();
return $this->queue->getName();
}
}
9 changes: 5 additions & 4 deletions src/Debug/QueueProviderInterfaceProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
private readonly QueueCollector $collector,
) {}

public function get(string|BackedEnum $channel): QueueInterface
public function get(string|BackedEnum $queue): QueueInterface

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.4-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)
{
$queue = $this->queueProvider->get($channel);
$queue = $this->queueProvider->get($queue);

return new QueueDecorator($queue, $this->collector);
}

public function has(string|BackedEnum $channel): bool
public function has(string|BackedEnum $queue): bool

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.4-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)
{
return $this->queueProvider->has($channel);
return $this->queueProvider->has($queue);
}
}
15 changes: 7 additions & 8 deletions src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ public function dispatch(
FailureHandlingRequest $request,
MessageFailureHandlerInterface $finishHandler,
): FailureHandlingRequest {
/** @var string $channel It is always string in this context */
$channel = $request->getQueue()->getChannel();
if (!isset($this->middlewareDefinitions[$channel]) || $this->middlewareDefinitions[$channel] === []) {
$channel = self::DEFAULT_PIPELINE;
$queueName = $request->getQueue()->getName();
if (!isset($this->middlewareDefinitions[$queueName]) || $this->middlewareDefinitions[$queueName] === []) {
$queueName = self::DEFAULT_PIPELINE;
}
$definitions = array_reverse($this->middlewareDefinitions[$channel]);
$definitions = array_reverse($this->middlewareDefinitions[$queueName]);

if (!isset($this->stack[$channel])) {
$this->stack[$channel] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
if (!isset($this->stack[$queueName])) {
$this->stack[$queueName] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
}

return $this->stack[$channel]->handleFailure($request);
return $this->stack[$queueName]->handleFailure($request);
}

/**
Expand Down
Loading
Loading