Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@
use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;

/**
* Adapter interface for handling queue operations.
*/
interface AdapterInterface
{
/**
* Returns the first message from the queue if it exists (null otherwise).
* Handle all existing messages in the queue.
*
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages
* Processes all messages that are currently in the queue. The message passed to the handler callback
* is guaranteed to not be an {@see Envelope} instance, only a plain {@see MessageInterface} with
* merged metadata from any wrapping envelopes.
*
* @param callable $handlerCallback The handler which will handle messages (never an {@see Envelope} instance).
* Returns `false` if it cannot continue handling messages.
Comment on lines +18 to +23
*
* @psalm-param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void;

Expand All @@ -25,14 +35,28 @@ public function runExisting(callable $handlerCallback): void;
public function status(string|int $id): MessageStatus;

/**
* Pushing a message to the queue. Adapter sets message ID if available.
* Push a message to the queue.
*
* Adds a message to the queue. The message is guaranteed to not be an {@see Envelope} instance,
* only a plain {@see MessageInterface} with merged metadata from any wrapping envelopes.
*
* @param MessageInterface $message The message to push. Never an {@see Envelope} instance.
* @return MessageInterface The message with any modifications made by the adapter. May be wrapped in
* an {@see Envelope} to carry additional metadata.
Comment on lines +40 to +45
*/
public function push(MessageInterface $message): MessageInterface;

/**
* Listen to the queue and pass messages to the given handler as they come.
* Subscribe to the queue and process messages as they arrive.
*
* Listens to the queue and passes messages to the given handler as they become available.
* The message passed to the handler callback is guaranteed to not be an {@see Envelope} instance,
* only a plain {@see MessageInterface} with merged metadata from any wrapping envelopes.
*
* @param callable $handlerCallback The handler which will handle messages (never an {@see Envelope} instance).
* Returns `false` if it cannot continue handling messages.
Comment on lines +52 to +57
*
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages.
* @psalm-param callable(MessageInterface): bool $handlerCallback
*/
public function subscribe(callable $handlerCallback): void;
}
4 changes: 1 addition & 3 deletions src/Message/JsonMessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ public function serialize(MessageInterface $message): string
'meta' => $message->getMetadata(),
];
if (!isset($payload['meta']['message-class'])) {
$payload['meta']['message-class'] = $message instanceof Envelope
? $message->getMessage()::class
: $message::class;
$payload['meta']['message-class'] = $message::class;
}

return json_encode($payload, JSON_THROW_ON_ERROR);
Expand Down
21 changes: 21 additions & 0 deletions src/Message/MessageSerializerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@

namespace Yiisoft\Queue\Message;

/**
* Message serializer interface for converting messages to and from string representation.
*/
interface MessageSerializerInterface
{
/**
* Serialize a message to a string.
*
* Converts a message to its string representation for storage or transmission. The message passed to this method is
* guaranteed to not be an {@see Envelope} instance, only a plain {@see MessageInterface} with merged metadata from
* any wrapping envelopes.
*
* @param MessageInterface $message The message to serialize. Never an {@see Envelope} instance.
* @return string The serialized message.
*/
public function serialize(MessageInterface $message): string;

/**
* Unserialize a message from a string.
*
* Converts a string representation back to a {@see MessageInterface} instance.
*
* @param string $value The serialized message string.
* @return MessageInterface The deserialized message.
*/
public function unserialize(string $value): MessageInterface;
}
5 changes: 4 additions & 1 deletion src/Middleware/Push/AdapterPushHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Yiisoft\Queue\Middleware\Push;

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Envelope;
use Yiisoft\Queue\Message\MessageInterface;

/**
Expand All @@ -18,6 +19,8 @@ public function __construct(

public function handlePush(MessageInterface $message): MessageInterface
{
return $this->adapter->push($message);
return $this->adapter->push(
$message instanceof Envelope ? $message->getMessage() : $message,
);
Comment on lines +22 to +24
}
}
11 changes: 4 additions & 7 deletions tests/Benchmark/QueueBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ public function provideConsume(): Generator
yield 'simple mapping' => ['message' => $this->serializer->serialize(new GenericMessage('foo', 'bar'))];
yield 'with envelopes mapping' => [
'message' => $this->serializer->serialize(
new FailureEnvelope(
new IdEnvelope(
new GenericMessage('foo', 'bar'),
'test id',
),
['failure-1' => ['a', 'b', 'c']],
),
(new GenericMessage('foo', 'bar'))->withMetadata([
IdEnvelope::META_ID => 'test id',
FailureEnvelope::META_FAILURE => ['failure-1' => ['a', 'b', 'c']],
]),
),
];
}
Expand Down
17 changes: 9 additions & 8 deletions tests/Unit/Message/JsonMessageSerializerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ public function testSerialize(): void
);
}

public function testSerializeEnvelopeStack(): void
public function testSerializeWithMetadata(): void
{
$message = new GenericMessage('handler', 'test');
$message = new IdEnvelope($message, 'test-id');
$message = (new GenericMessage('handler', 'test'))
->withMetadata([IdEnvelope::META_ID => 'test-id']);

$serializer = $this->createSerializer();

Expand Down Expand Up @@ -178,14 +178,15 @@ public function testRestoreOriginalMessageClass(): void
$this->assertInstanceOf(TestMessage::class, $message);
}

public function testRestoreOriginalMessageClassWithEnvelope(): void
public function testRestoreOriginalMessageClassWithMetadata(): void
{
$message = new IdEnvelope(new TestMessage(), 1);
$message = (new TestMessage())
->withMetadata(['id' => 1]);
$serializer = $this->createSerializer();
Comment on lines +183 to 185
$serializer->unserialize($serializer->serialize($message));
$unserialized = $serializer->unserialize($serializer->serialize($message));

$this->assertInstanceOf(IdEnvelope::class, $message);
$this->assertInstanceOf(TestMessage::class, $message->getMessage());
$this->assertInstanceOf(TestMessage::class, $unserialized);
$this->assertEquals(['id' => 1, 'message-class' => TestMessage::class], $unserialized->getMetadata());
}

private function createSerializer(): JsonMessageSerializer
Expand Down
Loading