diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index bae188b..8ed0e8f 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -40,6 +40,42 @@ jobs: - name: Run test suite run: composer run-script tests + pheanstalk_compatibility: + runs-on: ubuntu-latest + strategy: + matrix: + pheanstalk-versions: ['4.0', '5.0', '6.0', '7.0', '8.0'] + name: Pheanstalk ${{ matrix.pheanstalk-versions }} + + steps: + - uses: actions/checkout@v2 + + - name: Set Timezone + uses: szenius/set-timezone@v1.0 + with: + timezoneLinux: "Europe/Paris" + + - name: Install PHP + uses: shivammathur/setup-php@v2 + with: + php-version: 8.4 + extensions: json + ini-values: date.timezone=Europe/Paris + + - name: Install & run Beanstalkd + run: | + sudo apt-get install -y beanstalkd + sudo systemctl start beanstalkd.service + + - name: Install dependencies + run: composer install --prefer-dist --no-progress + + - name: Require Pheanstalk version + run: composer require --dev "pda/pheanstalk:~${{ matrix.pheanstalk-versions }}" + + - name: Run test suite + run: composer run-script tests + analysis: name: Analysis runs-on: ubuntu-latest diff --git a/composer.json b/composer.json index 3955864..08843b5 100644 --- a/composer.json +++ b/composer.json @@ -37,7 +37,7 @@ "enqueue/fs": "~0.9", "league/container": "~3.0", "monolog/monolog": "~2.0", - "pda/pheanstalk": "^3.1@dev", + "pda/pheanstalk": "~4.0|~5.0|~6.0|~7.0|~8.0", "php-amqplib/php-amqplib": "~3.0", "phpbench/phpbench": "~0.0|~1.0", "phpunit/phpunit": "~9.6", @@ -60,7 +60,8 @@ "symfony/var-dumper": "VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)" }, "conflict": { - "enqueue/dsn": "0.10.25" + "enqueue/dsn": "0.10.25", + "pda/pheanstalk": "<4.0" }, "scripts": { "tests": "phpunit", diff --git a/psalm.xml b/psalm.xml index c108b07..a6fe30f 100644 --- a/psalm.xml +++ b/psalm.xml @@ -12,6 +12,9 @@ + + + diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php index 86ca61d..06e0b27 100644 --- a/src/Connection/Pheanstalk/PheanstalkConnection.php +++ b/src/Connection/Pheanstalk/PheanstalkConnection.php @@ -11,9 +11,22 @@ use Bdf\Queue\Message\MessageSerializationTrait; use Bdf\Queue\Serializer\SerializerInterface; use Bdf\Queue\Util\MultiServer; -use Pheanstalk\Connection; +use Pheanstalk\Contract\PheanstalkManagerInterface; use Pheanstalk\Pheanstalk; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Values\Timeout; + +use function class_alias; +use function class_exists; +use function fclose; +use function fsockopen; +use function interface_exists; +use function method_exists; + +if (!interface_exists(PheanstalkInterface::class)) { + // Support for Pheanstalk 5 + class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class); +} /** * PheanstalkConnection @@ -23,6 +36,9 @@ class PheanstalkConnection implements ConnectionDriverInterface use ConnectionNamed; use MessageSerializationTrait; + public const DEFAULT_PORT = 11300; + public const DEFAULT_TTR = 60; // 1 minute + /** * @var PheanstalkInterface */ @@ -50,8 +66,8 @@ public function __construct(string $name, SerializerInterface $serializer) */ public function setConfig(array $config): void { - $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', PheanstalkInterface::DEFAULT_PORT) + [ - 'ttr' => PheanstalkInterface::DEFAULT_TTR, + $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', self::DEFAULT_PORT) + [ + 'ttr' => self::DEFAULT_TTR, 'client-timeout' => null, ]; } @@ -78,13 +94,20 @@ public function timeToRun(): ?int * @return PheanstalkInterface * @throws ServerNotAvailableException If no servers has been found */ - public function pheanstalk(): PheanstalkInterface + public function pheanstalk() { if ($this->pheanstalk === null) { // Set the first available server // Pheanstalk manage a lazy connection. We can instantiate the client here. foreach ($this->getActiveHost() as $host => $port) { - $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']); + $timeout = (int) ($this->config['client-timeout'] ?? 10); + + if (class_exists(Timeout::class)) { + // Pheanstalk 5 + $timeout = new Timeout($timeout); + } + + $this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout); break; } } @@ -108,7 +131,11 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk) public function close(): void { if ($this->pheanstalk !== null) { - $this->pheanstalk->getConnection()->disconnect(); + if (method_exists($this->pheanstalk, 'disconnect')) { + // Pheanstalk 7 + $this->pheanstalk->disconnect(); + } + $this->pheanstalk = null; } } @@ -143,7 +170,10 @@ public function getActiveHost() $valid = []; foreach ($this->config['hosts'] as $host => $port) { - if ((new Connection($host, $port))->isServiceListening()) { + $stream = @fsockopen($host, $port, $errno, $errstr, 0.1); + + if ($stream !== false) { + fclose($stream); $valid[$host] = $port; } } diff --git a/src/Connection/Pheanstalk/PheanstalkQueue.php b/src/Connection/Pheanstalk/PheanstalkQueue.php index af0480f..4e2de69 100644 --- a/src/Connection/Pheanstalk/PheanstalkQueue.php +++ b/src/Connection/Pheanstalk/PheanstalkQueue.php @@ -5,7 +5,6 @@ use Bdf\Queue\Connection\ConnectionDriverInterface; use Bdf\Queue\Connection\CountableQueueDriverInterface; use Bdf\Queue\Connection\Exception\ConnectionException; -use Bdf\Queue\Connection\Exception\ConnectionFailedException; use Bdf\Queue\Connection\Exception\ConnectionLostException; use Bdf\Queue\Connection\Exception\ServerException; use Bdf\Queue\Connection\Extension\ConnectionBearer; @@ -16,10 +15,17 @@ use Bdf\Queue\Message\QueuedMessage; use Exception; use Pheanstalk\Exception\ClientException; +use Pheanstalk\Exception\ConnectionException as PheanstalkConnectionException; use Pheanstalk\Exception\ServerException as BaseServerException; use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; +use Pheanstalk\Values\Job as Pheanstalk5Job; +use Pheanstalk\Values\TubeName; +use Pheanstalk\Values\TubeStats; + +use function class_exists; +use function method_exists; /** * PheanstalkDriver @@ -46,15 +52,23 @@ public function push(Message $message): void { $message->setQueuedAt(new \DateTimeImmutable()); $pheanstalk = $this->connection->pheanstalk(); + $queue = $message->queue(); + + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } try { - $pheanstalk->useTube($message->queue())->put( + $pheanstalk->useTube($queue); + + $pheanstalk->put( $this->connection->serializer()->serialize($message), $message->header('priority', Pheanstalk::DEFAULT_PRIORITY), $message->delay(), $message->header('ttr', $this->connection->timeToRun()) ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -70,14 +84,21 @@ public function pushRaw($raw, string $queue, int $delay = 0): void { $pheanstalk = $this->connection->pheanstalk(); + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } + try { - $pheanstalk->useTube($queue)->put( + $pheanstalk->useTube($queue); + + $pheanstalk->put( $raw, Pheanstalk::DEFAULT_PRIORITY, $delay, $this->connection->timeToRun() ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -93,9 +114,28 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU { $pheanstalk = $this->connection->pheanstalk(); + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $queue = new TubeName($queue); + } + try { - $job = $pheanstalk->watchOnly($queue)->reserve($duration); - } catch (SocketException $e) { + if (method_exists($pheanstalk, 'watchOnly')) { + // Pheanstalk < 5 + $pheanstalk->watchOnly($queue); + } else { + // Pheanstalk 5 + $pheanstalk->watch($queue); + + foreach ($pheanstalk->listTubesWatched() as $tube) { + if ($tube != $queue) { + $pheanstalk->ignore($tube); + } + } + } + + $job = $pheanstalk->reserveWithTimeout($duration); + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -103,7 +143,7 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU throw new ConnectionException($e->getMessage(), $e->getCode(), $e); } - if (!$job instanceof PheanstalkJob) { + if (!$job instanceof PheanstalkJob && !$job instanceof Pheanstalk5Job) { return null; } @@ -119,7 +159,7 @@ public function acknowledge(QueuedMessage $message): void { try { $this->connection->pheanstalk()->delete($message->internalJob()); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -139,7 +179,7 @@ public function release(QueuedMessage $message): void $message->header('priority', Pheanstalk::DEFAULT_PRIORITY), $message->delay() ); - } catch (SocketException $e) { + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -153,8 +193,19 @@ public function release(QueuedMessage $message): void */ public function count(string $name): int { + if (class_exists(TubeName::class)) { + // Support for Pheanstalk 5 + $name = new TubeName($name); + } + try { - return $this->connection->pheanstalk()->statsTube($name)['current-jobs-ready']; + $stats = $this->connection->pheanstalk()->statsTube($name); + + if ($stats instanceof TubeStats) { + return $stats->currentJobsReady; + } else { + return $stats['current-jobs-ready']; + } } catch (Exception $e) { return 0; } @@ -169,12 +220,12 @@ public function stats(): array $workersInfo = []; foreach ($this->connection->getActiveHost() as $host => $port) { - $pheanstalk = new Pheanstalk($host, $port); + $pheanstalk = Pheanstalk::create($host, (int) $port); try { - $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk)); - $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk)); - } catch (SocketException $e) { + $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port)); + $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port)); + } catch (SocketException|PheanstalkConnectionException $e) { throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e); } catch (BaseServerException $e) { throw new ServerException($e->getMessage(), $e->getCode(), $e); @@ -196,27 +247,43 @@ public function stats(): array * * @return array */ - private function queuesInfo($pheanstalk) + private function queuesInfo($pheanstalk, string $host, int $port): array { $status = []; foreach ($pheanstalk->listTubes() as $tube) { try { - /** @var \Pheanstalk\Response\ArrayResponse $stats */ + /** @var \Pheanstalk\Response\ArrayResponse|TubeStats $stats */ $stats = $pheanstalk->statsTube($tube); - $status[] = [ - 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(), - 'queue' => $stats['name'], - 'jobs in queue' => $stats['current-jobs-ready'], - 'jobs running' => $stats['current-jobs-reserved'], - 'jobs delayed' => $stats['current-jobs-delayed'], -// 'jobs buried' => $stats['current-jobs-buried'], - 'total jobs' => $stats['total-jobs'], -// 'workers using' => $stats['current-using'], - 'workers waiting' => $stats['current-waiting'], - 'workers watching' => --$stats['current-watching'], // remove the monitoring - ]; + if ($stats instanceof TubeStats) { + // Pheanstalk 5 + $status[] = [ + 'host' => $host.':'.$port, + 'queue' => $stats->name->value, + 'jobs in queue' => $stats->currentJobsReady, + 'jobs running' => $stats->currentJobsReserved, + 'jobs delayed' => $stats->currentJobsDelayed, + // 'jobs buried' => $stats['current-jobs-buried'], + 'total jobs' => $stats->totalJobs, + // 'workers using' => $stats['current-using'], + 'workers waiting' => $stats->currentWaiting, + 'workers watching' => $stats->currentWatching - 1, // remove the monitoring + ]; + } else { + $status[] = [ + 'host' => $host.':'.$port, + 'queue' => $stats['name'], + 'jobs in queue' => $stats['current-jobs-ready'], + 'jobs running' => $stats['current-jobs-reserved'], + 'jobs delayed' => $stats['current-jobs-delayed'], + // 'jobs buried' => $stats['current-jobs-buried'], + 'total jobs' => $stats['total-jobs'], + // 'workers using' => $stats['current-using'], + 'workers waiting' => $stats['current-waiting'], + 'workers watching' => --$stats['current-watching'], // remove the monitoring + ]; + } } catch (Exception $e) { // tube not found } @@ -232,13 +299,13 @@ private function queuesInfo($pheanstalk) * * @return array */ - private function workersInfo($pheanstalk) + private function workersInfo($pheanstalk, string $host, int $port): array { $jobs = []; foreach ($pheanstalk->listTubes() as $tube) { $job = [ - 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(), + 'host' => $host.':'.$port, 'queue' => $tube, 'job ready id' => '', 'job ready data' => '', diff --git a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php index 10ffc44..6e7e93e 100644 --- a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php @@ -5,10 +5,15 @@ use Bdf\Queue\Connection\Generic\GenericTopic; use Bdf\Queue\Serializer\JsonSerializer; use Pheanstalk\Connection; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use function class_exists; +use function interface_exists; +use function method_exists; + /** * @group Bdf_Queue * @group Bdf_Queue_Connection @@ -31,6 +36,11 @@ class PheanstalkConnectionTest extends TestCase */ public function setUp(): void { + if (interface_exists(PheanstalkSubscriberInterface::class)) { + $this->markTestSkipped('Pheanstalk >= 5 is not supported'); + } + + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); $this->connection = new PheanstalkConnection('foo', new JsonSerializer()); @@ -70,9 +80,7 @@ public function test_set_get_pheanstalk() */ public function test_close() { - $connection = $this->createMock(Connection::class); - $connection->expects($this->once())->method('disconnect'); - $this->pheanstalk->expects($this->once())->method('getConnection')->willReturn($connection); + $this->expectNotToPerformAssertions(); $this->connection->close(); // close once diff --git a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php new file mode 100644 index 0000000..ee8edb6 --- /dev/null +++ b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php @@ -0,0 +1,117 @@ +markTestSkipped('Pheanstalk server is not available'); + } + + $this->connection = new PheanstalkConnection('test', new JsonSerializer()); + $this->connection->setConfig([ + 'host' => $host, + 'port' => $port, + ]); + + $this->queueName = 'test_queue_' . bin2hex(random_bytes(5)); + } + + public function test_queuePushPop() + { + $queue = $this->connection->queue(); + + $this->assertEquals(0, $queue->count($this->queueName)); + $queue->push( + (new Message(['foo' => 'bar'])) + ->setQueue($this->queueName) + ); + $this->assertEquals(1, $queue->count($this->queueName)); + + $message = $queue->pop($this->queueName); + $this->assertEquals($this->queueName, $message->message()->queue()); + $this->assertSame(['foo' => 'bar'], $message->message()->data()); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $message->acknowledge(); + + $this->assertEquals(0, $queue->count($this->queueName)); + $this->assertNull($queue->pop($this->queueName, 1)); + } + + public function test_queueRelease() + { + $queue = $this->connection->queue(); + $this->assertSame(0, $queue->count($this->queueName)); + $queue->push( + (new Message(['foo' => 'bar'])) + ->setQueue($this->queueName) + ); + $this->assertSame(1, $queue->count($this->queueName)); + + $message = $queue->pop($this->queueName); + $this->assertSame(['foo' => 'bar'], $message->message()->data()); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $queue->release($message->message()); + $this->assertSame(1, $queue->count($this->queueName)); + $this->assertEquals($message, $queue->pop($this->queueName)); + $this->assertFalse($message->isRejected()); + $this->assertFalse($message->isDeleted()); + + $message->acknowledge(); + } + + public function test_queueStats() + { + $queue = $this->connection->queue(); + $stats = $queue->stats(); + + $this->assertIsArray($stats); + $this->assertArrayHasKey('queues', $stats); + $this->assertArrayHasKey('workers', $stats); + } + + public function test_topic() + { + $messages = []; + + $topic = $this->connection->topic(); + $topic->subscribe(['topic.test'], function (QueueEnvelope $envelope) use (&$messages) { + $messages[] = $envelope->message()->data(); + $envelope->acknowledge(); + }); + + if (pcntl_fork() === 0) { + usleep(50000); + $topic->publish( + (new Message(['foo' => 'bar'])) + ->setTopic('topic.test') + ); + exit; + } + + $topic->consume(2); + $this->assertEquals([['foo' => 'bar']], $messages); + } +} diff --git a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php index 28dd54e..f4ff786 100644 --- a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php +++ b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php @@ -8,13 +8,20 @@ use Bdf\Queue\Message\Message; use Bdf\Queue\Message\QueuedMessage; use Bdf\Queue\Serializer\JsonSerializer; +use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; +use Pheanstalk\Contract\ResponseInterface; use Pheanstalk\Exception\SocketException; use Pheanstalk\Job as PheanstalkJob; use Pheanstalk\Pheanstalk; -use Pheanstalk\PheanstalkInterface; +use Pheanstalk\Response\ArrayResponse; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use function class_exists; +use function interface_exists; +use function method_exists; + /** * @group Bdf_Queue * @group Bdf_Queue_Connection @@ -37,6 +44,11 @@ class PheanstalkQueueTest extends TestCase */ public function setUp(): void { + if (interface_exists(PheanstalkSubscriberInterface::class)) { + $this->markTestSkipped('Pheanstalk >= 5 is not supported'); + } + + class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined $this->pheanstalk = $this->createMock(PheanstalkInterface::class); $connection = new PheanstalkConnection('foo', new JsonSerializer()); @@ -97,10 +109,10 @@ public function test_push_error($expected, $internal) $this->driver->push($message); } - public function provideExceptions() + public static function provideExceptions() { return [ - [ConnectionLostException::class, new SocketException()], + [ConnectionLostException::class, class_exists(SocketException::class) ? new SocketException() : new \Pheanstalk\Exception\ConnectionException(1, '')], [ServerException::class, new \Pheanstalk\Exception\ServerException()], [ConnectionException::class, new \Pheanstalk\Exception\ClientException()], ]; @@ -139,7 +151,12 @@ public function test_pop() $job->expects($this->once())->method('getData')->willReturn('{"data":"foo"}'); $this->pheanstalk->expects($this->once())->method('watchOnly')->with('queue')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job); + + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->with(1)->willReturn($job); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job); + } $message = $this->driver->pop('queue', 1)->message(); @@ -156,7 +173,12 @@ public function test_pop() public function test_pop_end_of_queue() { $this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null); + + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willReturn(null); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null); + } $this->assertSame(null, $this->driver->pop('queue', 1)); } @@ -168,7 +190,11 @@ public function test_pop_error($expected, $internal) { $this->expectException($expected); $this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf(); - $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal); + if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) { + $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willThrowException($internal); + } else { + $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal); + } $this->driver->pop('queue', 1); } @@ -236,7 +262,7 @@ public function test_release_error($expected, $internal) */ public function test_count() { - $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(['current-jobs-ready' => 1]); + $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(new ArrayResponse('', ['current-jobs-ready' => 1])); $this->assertSame(1, $this->driver->count('queue')); }