From 17dd61eeb1accc49da83b4a40ccec88af53e69c9 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 15:09:48 +0200 Subject: [PATCH 01/11] ci: Add build for pheanstalk compatibility --- .github/workflows/php.yml | 36 +++++++ composer.json | 2 +- .../Pheanstalk/PheanstalkFunctionalTest.php | 97 +++++++++++++++++++ 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index bae188b..8229412 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -40,6 +40,42 @@ jobs: - name: Run test suite run: composer run-script tests + pheanstalk_compatibility: + runs-on: ubuntu-latest + strategy: + matrix: + pheanstalk-versions: ['3.2.1'] + name: Pheanstalk ${{ matrix.pheanstalk-versions }} + + steps: + - uses: actions/checkout@v2 + + - name: Set Timezone + uses: szenius/set-timezone@v1.0 + with: + timezoneLinux: "Europe/Paris" + + - name: Install PHP + uses: shivammathur/setup-php@v2 + with: + php-version: 8.4 + extensions: json + ini-values: date.timezone=Europe/Paris + + - name: Install & run Beanstalkd + run: | + sudo apt-get install -y beanstalkd + sudo service beanstalkd start + + - name: Install dependencies + run: composer install --prefer-dist --no-progress + + - name: Require Pheanstalk version + run: composer require --dev "pda/pheanstalk:~${{ matrix.pheanstalk-versions }}" + + - name: Run test suite + run: composer run-script tests + analysis: name: Analysis runs-on: ubuntu-latest diff --git a/composer.json b/composer.json index 3955864..9bc6b25 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "^3.1@dev", + "pda/pheanstalk": "~3.2", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", diff --git a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php new file mode 100644 index 0000000..d7204a5 --- /dev/null +++ b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php @@ -0,0 +1,97 @@ +markTestSkipped('Pheanstalk server is not available'); + } + + $this->connection = new PheanstalkConnection('test', new JsonSerializer()); + $this->connection->setConfig([ + 'host' => $host, + 'port' => $port, + ]); + } + + public function test_queuePushPop() + { + $queue = $this->connection->queue(); + $queue->push( + (new Message(['foo' => 'bar'])) + ->setQueue('test') + ); + + $message = $queue->pop('test'); + $this->assertEquals('test', $message->message()->destination()); + $this->assertSame(['foo' => 'bar'], $message->message()->data()); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $message->acknowledge(); + + $this->assertNull($queue->pop('test')); + } + + public function test_queueRelease() + { + $queue = $this->connection->queue(); + $queue->push( + (new Message(['foo' => 'bar'])) + ->setQueue('test') + ); + + $message = $queue->pop('test'); + $this->assertSame(['foo' => 'bar'], $message->message()->data()); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $queue->release($message->message()); + $this->assertEquals($message, $queue->pop('test')); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $message->acknowledge(); + } + + public function test_topic() + { + $messages = []; + + $topic = $this->connection->topic(); + $topic->subscribe(['topic.test'], function (QueueEnvelope $envelope) use (&$messages) { + $messages[] = $envelope->message()->data(); + $envelope->acknowledge(); + }); + + if (pcntl_fork() === 0) { + usleep(50000); + $topic->publish( + (new Message(['foo' => 'bar'])) + ->setTopic('topic.test') + ); + exit; + } + + $topic->consume(2); + $this->assertEquals([['foo' => 'bar']], $messages); + } +} From cc54b9e5e21b811480f472e365ed93fcd6ae4271 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 15:19:25 +0200 Subject: [PATCH 02/11] ci: Try to launch beanstalkd on github action --- .github/workflows/php.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 8229412..3dd9325 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -65,7 +65,7 @@ jobs: - name: Install & run Beanstalkd run: | sudo apt-get install -y beanstalkd - sudo service beanstalkd start + sudo systemctl start beanstalkd - name: Install dependencies run: composer install --prefer-dist --no-progress From ba4a86c3ebb728c2a649354d4c5c65a65a9e0840 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 15:22:17 +0200 Subject: [PATCH 03/11] ci: Try to launch beanstalkd on github action --- .github/workflows/php.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 3dd9325..adf4f59 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -65,7 +65,7 @@ jobs: - name: Install & run Beanstalkd run: | sudo apt-get install -y beanstalkd - sudo systemctl start beanstalkd + sudo systemctl start beanstalkd.service - name: Install dependencies run: composer install --prefer-dist --no-progress From f667cda41f0be65e3b4a08613742a38ccdf67f43 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 16:47:03 +0200 Subject: [PATCH 04/11] ci: Try to launch beanstalkd on github action --- .github/workflows/php.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index adf4f59..0675a85 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -65,6 +65,8 @@ jobs: - name: Install & run Beanstalkd run: | sudo apt-get install -y beanstalkd + sudo systemctl enable beanstalkd.service + sudo systemctl enable beanstalkd.socket sudo systemctl start beanstalkd.service - name: Install dependencies From 32da10c8e54a0b8afc0cd7c7e0807994f1cc7c93 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 16:49:43 +0200 Subject: [PATCH 05/11] ci: simplify beanstalkd launch --- .github/workflows/php.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 0675a85..adf4f59 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -65,8 +65,6 @@ jobs: - name: Install & run Beanstalkd run: | sudo apt-get install -y beanstalkd - sudo systemctl enable beanstalkd.service - sudo systemctl enable beanstalkd.socket sudo systemctl start beanstalkd.service - name: Install dependencies From 073d68f7ec108020c2e8ac7d514b29c0f326a4b4 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Wed, 8 Oct 2025 18:00:52 +0200 Subject: [PATCH 06/11] feat(pheanstalk): Compatibility with v4.0 --- .github/workflows/php.yml | 2 +- composer.json | 2 +- .../Pheanstalk/PheanstalkConnection.php | 31 ++++++++++++++++--- src/Connection/Pheanstalk/PheanstalkQueue.php | 28 ++++++++++++----- .../Pheanstalk/PheanstalkConnectionTest.php | 10 +++++- .../Pheanstalk/PheanstalkFunctionalTest.php | 2 +- .../Pheanstalk/PheanstalkQueueTest.php | 30 +++++++++++++++--- 7 files changed, 83 insertions(+), 22 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index adf4f59..325f2b6 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -44,7 +44,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pheanstalk-versions: ['3.2.1'] + pheanstalk-versions: ['3.2.1', '4.0'] name: Pheanstalk ${{ matrix.pheanstalk-versions }} steps: diff --git a/composer.json b/composer.json index 9bc6b25..61d4004 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "~3.2", + "pda/pheanstalk": "~3.2|~4.0", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index 86ca61d..f5df236 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -11,9 +11,19 @@ use Bdf\Queue\Message\MessageSerializationTrait; use Bdf\Queue\Serializer\SerializerInterface; use Bdf\Queue\Util\MultiServer; -use Pheanstalk\Connection; use Pheanstalk\Pheanstalk; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkInterface; + +use function class_alias; +use function fclose; +use function fsockopen; +use function interface_exists; +use function method_exists; + +// Support for Pheanstalk 3 +if (!interface_exists(PheanstalkInterface::class)) { + class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class); +} /** * PheanstalkConnection @@ -84,7 +94,12 @@ public function pheanstalk(): PheanstalkInterface // Set the first available server // Pheanstalk manage a lazy connection. We can instantiate the client here. foreach ($this->getActiveHost() as $host => $port) { - $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']); + if (method_exists(Pheanstalk::class, 'create')) { + $this->pheanstalk = Pheanstalk::create($host, (int) $port, (int) ($this->config['client-timeout'] ?? 10)); + } else { + // Pheanstalk 3 + $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']); + } break; } } @@ -108,7 +123,10 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk) public function close(): void { if ($this->pheanstalk !== null) { - $this->pheanstalk->getConnection()->disconnect(); + if (method_exists($this->pheanstalk, 'getConnection')) { + $this->pheanstalk->getConnection()->disconnect(); + } + $this->pheanstalk = null; } } @@ -143,7 +161,10 @@ public function getActiveHost() $valid = []; foreach ($this->config['hosts'] as $host => $port) { - if ((new Connection($host, $port))->isServiceListening()) { + $stream = @fsockopen($host, $port, $errno, $errstr, 0.1); + + if ($stream !== false) { + fclose($stream); $valid[$host] = $port; } } diff --git a/src/Connection/Pheanstalk/PheanstalkQueue.php b/src/Connection/Pheanstalk/PheanstalkQueue.php index af0480f..77333d5 100644 --- a/src/Connection/Pheanstalk/PheanstalkQueue.php +++ b/src/Connection/Pheanstalk/PheanstalkQueue.php @@ -21,6 +21,8 @@ use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; +use function method_exists; + /** * PheanstalkDriver */ @@ -94,7 +96,14 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU $pheanstalk = $this->connection->pheanstalk(); try { - $job = $pheanstalk->watchOnly($queue)->reserve($duration); + $pheanstalk = $pheanstalk->watchOnly($queue); + + if (method_exists($pheanstalk, 'reserveWithTimeout')) { + $job = $pheanstalk->reserveWithTimeout($duration); + } else { + // Support for Pheanstalk 3 + $job = $pheanstalk->reserve($duration); + } } catch (SocketException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { @@ -169,11 +178,14 @@ public function stats(): array $workersInfo = []; foreach ($this->connection->getActiveHost() as $host => $port) { - $pheanstalk = new Pheanstalk($host, $port); + $pheanstalk = method_exists(Pheanstalk::class, 'create') + ? Pheanstalk::create($host, (int) $port) + : new Pheanstalk($host, $port) + ; try { - $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk)); - $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk)); + $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port)); + $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port)); } catch (SocketException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { @@ -196,7 +208,7 @@ public function stats(): array * * @return array */ - private function queuesInfo($pheanstalk) + private function queuesInfo($pheanstalk, string $host, int $port): array { $status = []; @@ -206,7 +218,7 @@ private function queuesInfo($pheanstalk) $stats = $pheanstalk->statsTube($tube); $status[] = [ - 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(), + 'host' => $host.':'.$port, 'queue' => $stats['name'], 'jobs in queue' => $stats['current-jobs-ready'], 'jobs running' => $stats['current-jobs-reserved'], @@ -232,13 +244,13 @@ private function queuesInfo($pheanstalk) * * @return array */ - private function workersInfo($pheanstalk) + private function workersInfo($pheanstalk, string $host, int $port): array { $jobs = []; foreach ($pheanstalk->listTubes() as $tube) { $job = [ - 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(), + 'host' => $host.':'.$port, 'queue' => $tube, 'job ready id' => '', 'job ready data' => '', diff --git a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php index 10ffc44..e518883 100644 --- a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php @@ -5,10 +5,13 @@ use Bdf\Queue\Connection\Generic\GenericTopic; use Bdf\Queue\Serializer\JsonSerializer; use Pheanstalk\Connection; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use function class_exists; +use function method_exists; + /** * @group Bdf_Queue * @group Bdf_Queue_Connection @@ -31,6 +34,7 @@ class PheanstalkConnectionTest extends TestCase */ public function setUp(): void { + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); $this->connection = new PheanstalkConnection('foo', new JsonSerializer()); @@ -70,6 +74,10 @@ public function test_set_get_pheanstalk() */ public function test_close() { + if (!method_exists($this->pheanstalk, 'getConnection')) { + $this->markTestSkipped('Pheanstalk < 4.0 does not support connection closing'); + } + $connection = $this->createMock(Connection::class); $connection->expects($this->once())->method('disconnect'); $this->pheanstalk->expects($this->once())->method('getConnection')->willReturn($connection); diff --git a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php index d7204a5..7be3f1d 100644 --- a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php @@ -48,7 +48,7 @@ public function test_queuePushPop() $message->acknowledge(); - $this->assertNull($queue->pop('test')); + $this->assertNull($queue->pop('test', 1)); } public function test_queueRelease() diff --git a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php index 28dd54e..a76038e 100644 --- a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php @@ -8,13 +8,18 @@ use Bdf\Queue\Message\Message; use Bdf\Queue\Message\QueuedMessage; use Bdf\Queue\Serializer\JsonSerializer; +use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\ResponseInterface; use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Response\ArrayResponse; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use function class_exists; +use function method_exists; + /** * @group Bdf_Queue * @group Bdf_Queue_Connection @@ -37,6 +42,7 @@ class PheanstalkQueueTest extends TestCase */ public function setUp(): void { + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); $connection = new PheanstalkConnection('foo', new JsonSerializer()); @@ -139,7 +145,12 @@ public function test_pop() $job->expects($this->once())->method('getData')->willReturn('{"data":"foo"}'); $this->pheanstalk->expects($this->once())->method('watchOnly')->with('queue')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job); + + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->with(1)->willReturn($job); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job); + } $message = $this->driver->pop('queue', 1)->message(); @@ -156,7 +167,12 @@ public function test_pop() public function test_pop_end_of_queue() { $this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null); + + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willReturn(null); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null); + } $this->assertSame(null, $this->driver->pop('queue', 1)); } @@ -168,7 +184,11 @@ public function test_pop_error($expected, $internal) { $this->expectException($expected); $this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal); + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willThrowException($internal); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal); + } $this->driver->pop('queue', 1); } @@ -236,7 +256,7 @@ public function test_release_error($expected, $internal) */ public function test_count() { - $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(['current-jobs-ready' => 1]); + $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(new ArrayResponse('', ['current-jobs-ready' => 1])); $this->assertSame(1, $this->driver->count('queue')); } From 1a4b10daba7be63d67b7504abdb38f97fb423e02 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Thu, 9 Oct 2025 09:34:40 +0200 Subject: [PATCH 07/11] ci: fix psalm error --- src/Connection/Pheanstalk/PheanstalkConnection.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index f5df236..c87cff3 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -22,6 +22,7 @@ // Support for Pheanstalk 3 if (!interface_exists(PheanstalkInterface::class)) { + /** @psalm-suppress UndefinedClass */ class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class); } @@ -98,6 +99,7 @@ public function pheanstalk(): PheanstalkInterface $this->pheanstalk = Pheanstalk::create($host, (int) $port, (int) ($this->config['client-timeout'] ?? 10)); } else { // Pheanstalk 3 + /** @psalm-suppress InvalidArgument */ $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']); } break; From 7e852d45b5b3c658b2fab2a78a307d3f3433036e Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Thu, 9 Oct 2025 11:56:53 +0200 Subject: [PATCH 08/11] feat(pheanstalk): Compatibility with v5.0 --- .github/workflows/php.yml | 2 +- composer.json | 2 +- .../Pheanstalk/PheanstalkConnection.php | 35 ++++-- src/Connection/Pheanstalk/PheanstalkQueue.php | 114 ++++++++++++++---- .../Pheanstalk/PheanstalkConnectionTest.php | 6 + .../Pheanstalk/PheanstalkFunctionalTest.php | 36 ++++-- .../Pheanstalk/PheanstalkQueueTest.php | 10 +- 7 files changed, 162 insertions(+), 43 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 325f2b6..a0fcb02 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -44,7 +44,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pheanstalk-versions: ['3.2.1', '4.0'] + pheanstalk-versions: ['3.2.1', '4.0', '5.0'] name: Pheanstalk ${{ matrix.pheanstalk-versions }} steps: diff --git a/composer.json b/composer.json index 61d4004..9a5d6b5 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "~3.2|~4.0", + "pda/pheanstalk": "~3.2|~4.0|~5.0", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index c87cff3..4f748de 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -11,19 +11,30 @@ use Bdf\Queue\Message\MessageSerializationTrait; use Bdf\Queue\Serializer\SerializerInterface; use Bdf\Queue\Util\MultiServer; +use Pheanstalk\Contract\PheanstalkManagerInterface; +use Pheanstalk\Contract\PheanstalkPublisherInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; use Pheanstalk\Pheanstalk; use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Values\Timeout; + use function class_alias; +use function class_exists; use function fclose; use function fsockopen; use function interface_exists; use function method_exists; -// Support for Pheanstalk 3 if (!interface_exists(PheanstalkInterface::class)) { - /** @psalm-suppress UndefinedClass */ - class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class); + // Support for Pheanstalk 3 + if (interface_exists(\Pheanstalk\PheanstalkInterface::class)) { + /** @psalm-suppress UndefinedClass */ + class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class); + } else { + // Support for Pheanstalk 5 + class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class); + } } /** @@ -34,6 +45,9 @@ class PheanstalkConnection implements ConnectionDriverInterface use ConnectionNamed; use MessageSerializationTrait; + public const DEFAULT_PORT = 11300; + public const DEFAULT_TTR = 60; // 1 minute + /** * @var PheanstalkInterface */ @@ -61,8 +75,8 @@ public function __construct(string $name, SerializerInterface $serializer) */ public function setConfig(array $config): void { - $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', PheanstalkInterface::DEFAULT_PORT) + [ - 'ttr' => PheanstalkInterface::DEFAULT_TTR, + $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', self::DEFAULT_PORT) + [ + 'ttr' => self::DEFAULT_TTR, 'client-timeout' => null, ]; } @@ -89,14 +103,21 @@ public function timeToRun(): ?int * @return PheanstalkInterface * @throws ServerNotAvailableException If no servers has been found */ - public function pheanstalk(): PheanstalkInterface + public function pheanstalk() { if ($this->pheanstalk === null) { // Set the first available server // Pheanstalk manage a lazy connection. We can instantiate the client here. foreach ($this->getActiveHost() as $host => $port) { if (method_exists(Pheanstalk::class, 'create')) { - $this->pheanstalk = Pheanstalk::create($host, (int) $port, (int) ($this->config['client-timeout'] ?? 10)); + $timeout = (int) ($this->config['client-timeout'] ?? 10); + + if (class_exists(Timeout::class)) { + // Pheanstalk 5 + $timeout = new Timeout($timeout); + } + + $this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout); } else { // Pheanstalk 3 /** @psalm-suppress InvalidArgument */ diff --git a/src/Connection/Pheanstalk/PheanstalkQueue.php b/src/Connection/Pheanstalk/PheanstalkQueue.php index 77333d5..2a0a2f7 100644 --- a/src/Connection/Pheanstalk/PheanstalkQueue.php +++ b/src/Connection/Pheanstalk/PheanstalkQueue.php @@ -16,11 +16,18 @@ use Bdf\Queue\Message\QueuedMessage; use Exception; use Pheanstalk\Exception\ClientException; +use Pheanstalk\Exception\ConnectionException as PheanstalkConnectionException; use Pheanstalk\Exception\ServerException as BaseServerException; use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; +use Pheanstalk\Values\Job as Pheanstalk5Job; +use Pheanstalk\Values\TubeName; + +use Pheanstalk\Values\TubeStats; + +use function class_exists; use function method_exists; /** @@ -48,15 +55,23 @@ public function push(Message $message): void { $message->setQueuedAt(new \DateTimeImmutable()); $pheanstalk = $this->connection->pheanstalk(); + $queue = $message->queue(); + + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } try { - $pheanstalk->useTube($message->queue())->put( + $pheanstalk->useTube($queue); + + $pheanstalk->put( $this->connection->serializer()->serialize($message), $message->header('priority', Pheanstalk::DEFAULT_PRIORITY), $message->delay(), $message->header('ttr', $this->connection->timeToRun()) ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -72,14 +87,21 @@ public function pushRaw($raw, string $queue, int $delay = 0): void { $pheanstalk = $this->connection->pheanstalk(); + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } + try { - $pheanstalk->useTube($queue)->put( + $pheanstalk->useTube($queue); + + $pheanstalk->put( $raw, Pheanstalk::DEFAULT_PRIORITY, $delay, $this->connection->timeToRun() ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -95,8 +117,25 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU { $pheanstalk = $this->connection->pheanstalk(); + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } + try { - $pheanstalk = $pheanstalk->watchOnly($queue); + if (method_exists($pheanstalk, 'watchOnly')) { + // Pheanstalk < 5 + $pheanstalk->watchOnly($queue); + } else { + // Pheanstalk 5 + $pheanstalk->watch($queue); + + foreach ($pheanstalk->listTubesWatched() as $tube) { + if ($tube != $queue) { + $pheanstalk->ignore($tube); + } + } + } if (method_exists($pheanstalk, 'reserveWithTimeout')) { $job = $pheanstalk->reserveWithTimeout($duration); @@ -104,7 +143,7 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU // Support for Pheanstalk 3 $job = $pheanstalk->reserve($duration); } - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -112,7 +151,7 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU throw new ConnectionException($e->getMessage(), $e->getCode(), $e); } - if (!$job instanceof PheanstalkJob) { + if (!$job instanceof PheanstalkJob && !$job instanceof Pheanstalk5Job) { return null; } @@ -128,7 +167,7 @@ public function acknowledge(QueuedMessage $message): void { try { $this->connection->pheanstalk()->delete($message->internalJob()); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -148,7 +187,7 @@ public function release(QueuedMessage $message): void $message->header('priority', Pheanstalk::DEFAULT_PRIORITY), $message->delay() ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -162,8 +201,19 @@ public function release(QueuedMessage $message): void */ public function count(string $name): int { + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $name = new TubeName($name); + } + try { - return $this->connection->pheanstalk()->statsTube($name)['current-jobs-ready']; + $stats = $this->connection->pheanstalk()->statsTube($name); + + if ($stats instanceof TubeStats) { + return $stats->currentJobsReady; + } else { + return $stats['current-jobs-ready']; + } } catch (Exception $e) { return 0; } @@ -186,7 +236,7 @@ public function stats(): array try { $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port)); $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port)); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -214,21 +264,37 @@ private function queuesInfo($pheanstalk, string $host, int $port): array foreach ($pheanstalk->listTubes() as $tube) { try { - /** @var \Pheanstalk\Response\ArrayResponse $stats */ + /** @var \Pheanstalk\Response\ArrayResponse|TubeStats $stats */ $stats = $pheanstalk->statsTube($tube); - $status[] = [ - 'host' => $host.':'.$port, - 'queue' => $stats['name'], - 'jobs in queue' => $stats['current-jobs-ready'], - 'jobs running' => $stats['current-jobs-reserved'], - 'jobs delayed' => $stats['current-jobs-delayed'], -// 'jobs buried' => $stats['current-jobs-buried'], - 'total jobs' => $stats['total-jobs'], -// 'workers using' => $stats['current-using'], - 'workers waiting' => $stats['current-waiting'], - 'workers watching' => --$stats['current-watching'], // remove the monitoring - ]; + if ($stats instanceof TubeStats) { + // Pheanstalk 5 + $status[] = [ + 'host' => $host.':'.$port, + 'queue' => $stats->name->value, + 'jobs in queue' => $stats->currentJobsReady, + 'jobs running' => $stats->currentJobsReserved, + 'jobs delayed' => $stats->currentJobsDelayed, + // 'jobs buried' => $stats['current-jobs-buried'], + 'total jobs' => $stats->totalJobs, + // 'workers using' => $stats['current-using'], + 'workers waiting' => $stats->currentWaiting, + 'workers watching' => $stats->currentWatching - 1, // remove the monitoring + ]; + } else { + $status[] = [ + 'host' => $host.':'.$port, + 'queue' => $stats['name'], + 'jobs in queue' => $stats['current-jobs-ready'], + 'jobs running' => $stats['current-jobs-reserved'], + 'jobs delayed' => $stats['current-jobs-delayed'], + // 'jobs buried' => $stats['current-jobs-buried'], + 'total jobs' => $stats['total-jobs'], + // 'workers using' => $stats['current-using'], + 'workers waiting' => $stats['current-waiting'], + 'workers watching' => --$stats['current-watching'], // remove the monitoring + ]; + } } catch (Exception $e) { // tube not found } diff --git a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php index e518883..45996a6 100644 --- a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php @@ -6,10 +6,12 @@ use Bdf\Queue\Serializer\JsonSerializer; use Pheanstalk\Connection; use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use function class_exists; +use function interface_exists; use function method_exists; /** @@ -34,6 +36,10 @@ class PheanstalkConnectionTest extends TestCase */ public function setUp(): void { + if (interface_exists(PheanstalkSubscriberInterface::class)) { + $this->markTestSkipped('Pheanstalk >= 5 is not supported'); + } + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); diff --git a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php index 7be3f1d..ee8edb6 100644 --- a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php @@ -5,16 +5,17 @@ use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; use Bdf\Queue\Message\Message; use Bdf\Queue\Message\QueueEnvelope; -use Bdf\Queue\Message\TopicEnvelope; use Bdf\Queue\Serializer\JsonSerializer; use PHPUnit\Framework\TestCase; use function pcntl_fork; use function usleep; +use function var_dump; class PheanstalkFunctionalTest extends TestCase { private PheanstalkConnection $connection; + private string $queueName; protected function setUp(): void { @@ -30,48 +31,67 @@ protected function setUp(): void 'host' => $host, 'port' => $port, ]); + + $this->queueName = 'test_queue_' . bin2hex(random_bytes(5)); } public function test_queuePushPop() { $queue = $this->connection->queue(); + + $this->assertEquals(0, $queue->count($this->queueName)); $queue->push( (new Message(['foo' => 'bar'])) - ->setQueue('test') + ->setQueue($this->queueName) ); + $this->assertEquals(1, $queue->count($this->queueName)); - $message = $queue->pop('test'); - $this->assertEquals('test', $message->message()->destination()); + $message = $queue->pop($this->queueName); + $this->assertEquals($this->queueName, $message->message()->queue()); $this->assertSame(['foo' => 'bar'], $message->message()->data()); $this->assertFalse($message->isRejected()); $this->assertFalse($message->isDeleted()); $message->acknowledge(); - $this->assertNull($queue->pop('test', 1)); + $this->assertEquals(0, $queue->count($this->queueName)); + $this->assertNull($queue->pop($this->queueName, 1)); } public function test_queueRelease() { $queue = $this->connection->queue(); + $this->assertSame(0, $queue->count($this->queueName)); $queue->push( (new Message(['foo' => 'bar'])) - ->setQueue('test') + ->setQueue($this->queueName) ); + $this->assertSame(1, $queue->count($this->queueName)); - $message = $queue->pop('test'); + $message = $queue->pop($this->queueName); $this->assertSame(['foo' => 'bar'], $message->message()->data()); $this->assertFalse($message->isRejected()); $this->assertFalse($message->isDeleted()); $queue->release($message->message()); - $this->assertEquals($message, $queue->pop('test')); + $this->assertSame(1, $queue->count($this->queueName)); + $this->assertEquals($message, $queue->pop($this->queueName)); $this->assertFalse($message->isRejected()); $this->assertFalse($message->isDeleted()); $message->acknowledge(); } + public function test_queueStats() + { + $queue = $this->connection->queue(); + $stats = $queue->stats(); + + $this->assertIsArray($stats); + $this->assertArrayHasKey('queues', $stats); + $this->assertArrayHasKey('workers', $stats); + } + public function test_topic() { $messages = []; diff --git a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php index a76038e..f4ff786 100644 --- a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php @@ -9,6 +9,7 @@ use Bdf\Queue\Message\QueuedMessage; use Bdf\Queue\Serializer\JsonSerializer; use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; use Pheanstalk\Contract\ResponseInterface; use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; @@ -18,6 +19,7 @@ use PHPUnit\Framework\TestCase; use function class_exists; +use function interface_exists; use function method_exists; /** @@ -42,6 +44,10 @@ class PheanstalkQueueTest extends TestCase */ public function setUp(): void { + if (interface_exists(PheanstalkSubscriberInterface::class)) { + $this->markTestSkipped('Pheanstalk >= 5 is not supported'); + } + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); @@ -103,10 +109,10 @@ public function test_push_error($expected, $internal) $this->driver->push($message); } - public function provideExceptions() + public static function provideExceptions() { return [ - [ConnectionLostException::class, new SocketException()], + [ConnectionLostException::class, class_exists(SocketException::class) ? new SocketException() : new \Pheanstalk\Exception\ConnectionException(1, '')], [ServerException::class, new \Pheanstalk\Exception\ServerException()], [ConnectionException::class, new \Pheanstalk\Exception\ClientException()], ]; From 22550d3d413c050cd95306a44d0f184bee5f6ea7 Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Thu, 9 Oct 2025 13:54:11 +0200 Subject: [PATCH 09/11] ci: ignore psalm for pheanstalk --- psalm.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/psalm.xml b/psalm.xml index c108b07..a6fe30f 100644 --- a/psalm.xml +++ b/psalm.xml @@ -12,6 +12,9 @@ + + + From d238416838fd517c7154dd8d0e77b2ae2172976c Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Thu, 9 Oct 2025 14:17:25 +0200 Subject: [PATCH 10/11] feat(pheanstalk): Compatibility with v6 to v8 --- .github/workflows/php.yml | 2 +- composer.json | 2 +- src/Connection/Pheanstalk/PheanstalkConnection.php | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index a0fcb02..69534f3 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -44,7 +44,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pheanstalk-versions: ['3.2.1', '4.0', '5.0'] + pheanstalk-versions: ['3.2.1', '4.0', '5.0', '6.0', '7.0', '8.0'] name: Pheanstalk ${{ matrix.pheanstalk-versions }} steps: diff --git a/composer.json b/composer.json index 9a5d6b5..b09f4ed 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "~3.2|~4.0|~5.0", + "pda/pheanstalk": "~3.2|~4.0|~5.0|~6.0|~7.0|~8.0", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index 4f748de..95a6ceb 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -148,6 +148,9 @@ public function close(): void if ($this->pheanstalk !== null) { if (method_exists($this->pheanstalk, 'getConnection')) { $this->pheanstalk->getConnection()->disconnect(); + } elseif (method_exists($this->pheanstalk, 'disconnect')) { + // Pheanstalk 7 + $this->pheanstalk->disconnect(); } $this->pheanstalk = null; From d0e132def14ce7d93b24049028ff6b03b97a5bab Mon Sep 17 00:00:00 2001 From: Vincent QUATREVIEUX Date: Tue, 18 Nov 2025 10:34:57 +0100 Subject: [PATCH 11/11] chore(beanstalk): Drop support of pheanstalk v3 --- .github/workflows/php.yml | 2 +- composer.json | 5 ++- .../Pheanstalk/PheanstalkConnection.php | 37 +++++-------------- src/Connection/Pheanstalk/PheanstalkQueue.php | 15 +------- .../Pheanstalk/PheanstalkConnectionTest.php | 8 +--- 5 files changed, 17 insertions(+), 50 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 69534f3..8ed0e8f 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -44,7 +44,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pheanstalk-versions: ['3.2.1', '4.0', '5.0', '6.0', '7.0', '8.0'] + pheanstalk-versions: ['4.0', '5.0', '6.0', '7.0', '8.0'] name: Pheanstalk ${{ matrix.pheanstalk-versions }} steps: diff --git a/composer.json b/composer.json index b09f4ed..08843b5 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "~3.2|~4.0|~5.0|~6.0|~7.0|~8.0", + "pda/pheanstalk": "~4.0|~5.0|~6.0|~7.0|~8.0", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", @@ -60,7 +60,8 @@ "symfony/var-dumper": "VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)" }, "conflict": { - "enqueue/dsn": "0.10.25" + "enqueue/dsn": "0.10.25", + "pda/pheanstalk": "<4.0" }, "scripts": { "tests": "phpunit", diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index 95a6ceb..06e0b27 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -12,11 +12,8 @@ use Bdf\Queue\Serializer\SerializerInterface; use Bdf\Queue\Util\MultiServer; use Pheanstalk\Contract\PheanstalkManagerInterface; -use Pheanstalk\Contract\PheanstalkPublisherInterface; -use Pheanstalk\Contract\PheanstalkSubscriberInterface; use Pheanstalk\Pheanstalk; use Pheanstalk\Contract\PheanstalkInterface; - use Pheanstalk\Values\Timeout; use function class_alias; @@ -27,14 +24,8 @@ use function method_exists; if (!interface_exists(PheanstalkInterface::class)) { - // Support for Pheanstalk 3 - if (interface_exists(\Pheanstalk\PheanstalkInterface::class)) { - /** @psalm-suppress UndefinedClass */ - class_alias(\Pheanstalk\PheanstalkInterface::class, PheanstalkInterface::class); - } else { - // Support for Pheanstalk 5 - class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class); - } + // Support for Pheanstalk 5 + class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class); } /** @@ -109,20 +100,14 @@ public function pheanstalk() // Set the first available server // Pheanstalk manage a lazy connection. We can instantiate the client here. foreach ($this->getActiveHost() as $host => $port) { - if (method_exists(Pheanstalk::class, 'create')) { - $timeout = (int) ($this->config['client-timeout'] ?? 10); - - if (class_exists(Timeout::class)) { - // Pheanstalk 5 - $timeout = new Timeout($timeout); - } - - $this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout); - } else { - // Pheanstalk 3 - /** @psalm-suppress InvalidArgument */ - $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']); + $timeout = (int) ($this->config['client-timeout'] ?? 10); + + if (class_exists(Timeout::class)) { + // Pheanstalk 5 + $timeout = new Timeout($timeout); } + + $this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout); break; } } @@ -146,9 +131,7 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk) public function close(): void { if ($this->pheanstalk !== null) { - if (method_exists($this->pheanstalk, 'getConnection')) { - $this->pheanstalk->getConnection()->disconnect(); - } elseif (method_exists($this->pheanstalk, 'disconnect')) { + if (method_exists($this->pheanstalk, 'disconnect')) { // Pheanstalk 7 $this->pheanstalk->disconnect(); } diff --git a/src/Connection/Pheanstalk/PheanstalkQueue.php b/src/Connection/Pheanstalk/PheanstalkQueue.php index 2a0a2f7..4e2de69 100644 --- a/src/Connection/Pheanstalk/PheanstalkQueue.php +++ b/src/Connection/Pheanstalk/PheanstalkQueue.php @@ -5,7 +5,6 @@ use Bdf\Queue\Connection\ConnectionDriverInterface; use Bdf\Queue\Connection\CountableQueueDriverInterface; use Bdf\Queue\Connection\Exception\ConnectionException; -use Bdf\Queue\Connection\Exception\ConnectionFailedException; use Bdf\Queue\Connection\Exception\ConnectionLostException; use Bdf\Queue\Connection\Exception\ServerException; use Bdf\Queue\Connection\Extension\ConnectionBearer; @@ -21,10 +20,8 @@ use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; - use Pheanstalk\Values\Job as Pheanstalk5Job; use Pheanstalk\Values\TubeName; - use Pheanstalk\Values\TubeStats; use function class_exists; @@ -137,12 +134,7 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU } } - if (method_exists($pheanstalk, 'reserveWithTimeout')) { - $job = $pheanstalk->reserveWithTimeout($duration); - } else { - // Support for Pheanstalk 3 - $job = $pheanstalk->reserve($duration); - } + $job = $pheanstalk->reserveWithTimeout($duration); } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { @@ -228,10 +220,7 @@ public function stats(): array $workersInfo = []; foreach ($this->connection->getActiveHost() as $host => $port) { - $pheanstalk = method_exists(Pheanstalk::class, 'create') - ? Pheanstalk::create($host, (int) $port) - : new Pheanstalk($host, $port) - ; + $pheanstalk = Pheanstalk::create($host, (int) $port); try { $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port)); diff --git a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php index 45996a6..6e7e93e 100644 --- a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php @@ -80,13 +80,7 @@ public function test_set_get_pheanstalk() */ public function test_close() { - if (!method_exists($this->pheanstalk, 'getConnection')) { - $this->markTestSkipped('Pheanstalk < 4.0 does not support connection closing'); - } - - $connection = $this->createMock(Connection::class); - $connection->expects($this->once())->method('disconnect'); - $this->pheanstalk->expects($this->once())->method('getConnection')->willReturn($connection); + $this->expectNotToPerformAssertions(); $this->connection->close(); // close once