diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml
index bae188b..8ed0e8f 100644
--- a/.github/workflows/php.yml
+++ b/.github/workflows/php.yml
@@ -40,6 +40,42 @@ jobs:
- name: Run test suite
run: composer run-script tests
+ pheanstalk_compatibility:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ pheanstalk-versions: ['4.0', '5.0', '6.0', '7.0', '8.0']
+ name: Pheanstalk ${{ matrix.pheanstalk-versions }}
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Set Timezone
+ uses: szenius/set-timezone@v1.0
+ with:
+ timezoneLinux: "Europe/Paris"
+
+ - name: Install PHP
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: 8.4
+ extensions: json
+ ini-values: date.timezone=Europe/Paris
+
+ - name: Install & run Beanstalkd
+ run: |
+ sudo apt-get install -y beanstalkd
+ sudo systemctl start beanstalkd.service
+
+ - name: Install dependencies
+ run: composer install --prefer-dist --no-progress
+
+ - name: Require Pheanstalk version
+ run: composer require --dev "pda/pheanstalk:~${{ matrix.pheanstalk-versions }}"
+
+ - name: Run test suite
+ run: composer run-script tests
+
analysis:
name: Analysis
runs-on: ubuntu-latest
diff --git a/composer.json b/composer.json
index 3955864..08843b5 100644
--- a/composer.json
+++ b/composer.json
@@ -37,7 +37,7 @@
"enqueue/fs": "~0.9",
"league/container": "~3.0",
"monolog/monolog": "~2.0",
- "pda/pheanstalk": "^3.1@dev",
+ "pda/pheanstalk": "~4.0|~5.0|~6.0|~7.0|~8.0",
"php-amqplib/php-amqplib": "~3.0",
"phpbench/phpbench": "~0.0|~1.0",
"phpunit/phpunit": "~9.6",
@@ -60,7 +60,8 @@
"symfony/var-dumper": "VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)"
},
"conflict": {
- "enqueue/dsn": "0.10.25"
+ "enqueue/dsn": "0.10.25",
+ "pda/pheanstalk": "<4.0"
},
"scripts": {
"tests": "phpunit",
diff --git a/psalm.xml b/psalm.xml
index c108b07..a6fe30f 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -12,6 +12,9 @@
+
+
+
diff --git a/src/Connection/Pheanstalk/PheanstalkConnection.php b/src/Connection/Pheanstalk/PheanstalkConnection.php
index 86ca61d..06e0b27 100644
--- a/src/Connection/Pheanstalk/PheanstalkConnection.php
+++ b/src/Connection/Pheanstalk/PheanstalkConnection.php
@@ -11,9 +11,22 @@
use Bdf\Queue\Message\MessageSerializationTrait;
use Bdf\Queue\Serializer\SerializerInterface;
use Bdf\Queue\Util\MultiServer;
-use Pheanstalk\Connection;
+use Pheanstalk\Contract\PheanstalkManagerInterface;
use Pheanstalk\Pheanstalk;
-use Pheanstalk\PheanstalkInterface;
+use Pheanstalk\Contract\PheanstalkInterface;
+use Pheanstalk\Values\Timeout;
+
+use function class_alias;
+use function class_exists;
+use function fclose;
+use function fsockopen;
+use function interface_exists;
+use function method_exists;
+
+if (!interface_exists(PheanstalkInterface::class)) {
+ // Support for Pheanstalk 5
+ class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class);
+}
/**
* PheanstalkConnection
@@ -23,6 +36,9 @@ class PheanstalkConnection implements ConnectionDriverInterface
use ConnectionNamed;
use MessageSerializationTrait;
+ public const DEFAULT_PORT = 11300;
+ public const DEFAULT_TTR = 60; // 1 minute
+
/**
* @var PheanstalkInterface
*/
@@ -50,8 +66,8 @@ public function __construct(string $name, SerializerInterface $serializer)
*/
public function setConfig(array $config): void
{
- $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', PheanstalkInterface::DEFAULT_PORT) + [
- 'ttr' => PheanstalkInterface::DEFAULT_TTR,
+ $this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', self::DEFAULT_PORT) + [
+ 'ttr' => self::DEFAULT_TTR,
'client-timeout' => null,
];
}
@@ -78,13 +94,20 @@ public function timeToRun(): ?int
* @return PheanstalkInterface
* @throws ServerNotAvailableException If no servers has been found
*/
- public function pheanstalk(): PheanstalkInterface
+ public function pheanstalk()
{
if ($this->pheanstalk === null) {
// Set the first available server
// Pheanstalk manage a lazy connection. We can instantiate the client here.
foreach ($this->getActiveHost() as $host => $port) {
- $this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']);
+ $timeout = (int) ($this->config['client-timeout'] ?? 10);
+
+ if (class_exists(Timeout::class)) {
+ // Pheanstalk 5
+ $timeout = new Timeout($timeout);
+ }
+
+ $this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout);
break;
}
}
@@ -108,7 +131,11 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk)
public function close(): void
{
if ($this->pheanstalk !== null) {
- $this->pheanstalk->getConnection()->disconnect();
+ if (method_exists($this->pheanstalk, 'disconnect')) {
+ // Pheanstalk 7
+ $this->pheanstalk->disconnect();
+ }
+
$this->pheanstalk = null;
}
}
@@ -143,7 +170,10 @@ public function getActiveHost()
$valid = [];
foreach ($this->config['hosts'] as $host => $port) {
- if ((new Connection($host, $port))->isServiceListening()) {
+ $stream = @fsockopen($host, $port, $errno, $errstr, 0.1);
+
+ if ($stream !== false) {
+ fclose($stream);
$valid[$host] = $port;
}
}
diff --git a/src/Connection/Pheanstalk/PheanstalkQueue.php b/src/Connection/Pheanstalk/PheanstalkQueue.php
index af0480f..4e2de69 100644
--- a/src/Connection/Pheanstalk/PheanstalkQueue.php
+++ b/src/Connection/Pheanstalk/PheanstalkQueue.php
@@ -5,7 +5,6 @@
use Bdf\Queue\Connection\ConnectionDriverInterface;
use Bdf\Queue\Connection\CountableQueueDriverInterface;
use Bdf\Queue\Connection\Exception\ConnectionException;
-use Bdf\Queue\Connection\Exception\ConnectionFailedException;
use Bdf\Queue\Connection\Exception\ConnectionLostException;
use Bdf\Queue\Connection\Exception\ServerException;
use Bdf\Queue\Connection\Extension\ConnectionBearer;
@@ -16,10 +15,17 @@
use Bdf\Queue\Message\QueuedMessage;
use Exception;
use Pheanstalk\Exception\ClientException;
+use Pheanstalk\Exception\ConnectionException as PheanstalkConnectionException;
use Pheanstalk\Exception\ServerException as BaseServerException;
use Pheanstalk\Exception\SocketException;
use Pheanstalk\Job as PheanstalkJob;
use Pheanstalk\Pheanstalk;
+use Pheanstalk\Values\Job as Pheanstalk5Job;
+use Pheanstalk\Values\TubeName;
+use Pheanstalk\Values\TubeStats;
+
+use function class_exists;
+use function method_exists;
/**
* PheanstalkDriver
@@ -46,15 +52,23 @@ public function push(Message $message): void
{
$message->setQueuedAt(new \DateTimeImmutable());
$pheanstalk = $this->connection->pheanstalk();
+ $queue = $message->queue();
+
+ if (class_exists(TubeName::class)) {
+ // Support for Pheanstalk 5
+ $queue = new TubeName($queue);
+ }
try {
- $pheanstalk->useTube($message->queue())->put(
+ $pheanstalk->useTube($queue);
+
+ $pheanstalk->put(
$this->connection->serializer()->serialize($message),
$message->header('priority', Pheanstalk::DEFAULT_PRIORITY),
$message->delay(),
$message->header('ttr', $this->connection->timeToRun())
);
- } catch (SocketException $e) {
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -70,14 +84,21 @@ public function pushRaw($raw, string $queue, int $delay = 0): void
{
$pheanstalk = $this->connection->pheanstalk();
+ if (class_exists(TubeName::class)) {
+ // Support for Pheanstalk 5
+ $queue = new TubeName($queue);
+ }
+
try {
- $pheanstalk->useTube($queue)->put(
+ $pheanstalk->useTube($queue);
+
+ $pheanstalk->put(
$raw,
Pheanstalk::DEFAULT_PRIORITY,
$delay,
$this->connection->timeToRun()
);
- } catch (SocketException $e) {
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -93,9 +114,28 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU
{
$pheanstalk = $this->connection->pheanstalk();
+ if (class_exists(TubeName::class)) {
+ // Support for Pheanstalk 5
+ $queue = new TubeName($queue);
+ }
+
try {
- $job = $pheanstalk->watchOnly($queue)->reserve($duration);
- } catch (SocketException $e) {
+ if (method_exists($pheanstalk, 'watchOnly')) {
+ // Pheanstalk < 5
+ $pheanstalk->watchOnly($queue);
+ } else {
+ // Pheanstalk 5
+ $pheanstalk->watch($queue);
+
+ foreach ($pheanstalk->listTubesWatched() as $tube) {
+ if ($tube != $queue) {
+ $pheanstalk->ignore($tube);
+ }
+ }
+ }
+
+ $job = $pheanstalk->reserveWithTimeout($duration);
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -103,7 +143,7 @@ public function pop(string $queue, int $duration = ConnectionDriverInterface::DU
throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
}
- if (!$job instanceof PheanstalkJob) {
+ if (!$job instanceof PheanstalkJob && !$job instanceof Pheanstalk5Job) {
return null;
}
@@ -119,7 +159,7 @@ public function acknowledge(QueuedMessage $message): void
{
try {
$this->connection->pheanstalk()->delete($message->internalJob());
- } catch (SocketException $e) {
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -139,7 +179,7 @@ public function release(QueuedMessage $message): void
$message->header('priority', Pheanstalk::DEFAULT_PRIORITY),
$message->delay()
);
- } catch (SocketException $e) {
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -153,8 +193,19 @@ public function release(QueuedMessage $message): void
*/
public function count(string $name): int
{
+ if (class_exists(TubeName::class)) {
+ // Support for Pheanstalk 5
+ $name = new TubeName($name);
+ }
+
try {
- return $this->connection->pheanstalk()->statsTube($name)['current-jobs-ready'];
+ $stats = $this->connection->pheanstalk()->statsTube($name);
+
+ if ($stats instanceof TubeStats) {
+ return $stats->currentJobsReady;
+ } else {
+ return $stats['current-jobs-ready'];
+ }
} catch (Exception $e) {
return 0;
}
@@ -169,12 +220,12 @@ public function stats(): array
$workersInfo = [];
foreach ($this->connection->getActiveHost() as $host => $port) {
- $pheanstalk = new Pheanstalk($host, $port);
+ $pheanstalk = Pheanstalk::create($host, (int) $port);
try {
- $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk));
- $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk));
- } catch (SocketException $e) {
+ $queuesInfo = array_merge($queuesInfo, $this->queuesInfo($pheanstalk, $host, $port));
+ $workersInfo = array_merge($workersInfo, $this->workersInfo($pheanstalk, $host, $port));
+ } catch (SocketException|PheanstalkConnectionException $e) {
throw new ConnectionLostException($e->getMessage(), $e->getCode(), $e);
} catch (BaseServerException $e) {
throw new ServerException($e->getMessage(), $e->getCode(), $e);
@@ -196,27 +247,43 @@ public function stats(): array
*
* @return array
*/
- private function queuesInfo($pheanstalk)
+ private function queuesInfo($pheanstalk, string $host, int $port): array
{
$status = [];
foreach ($pheanstalk->listTubes() as $tube) {
try {
- /** @var \Pheanstalk\Response\ArrayResponse $stats */
+ /** @var \Pheanstalk\Response\ArrayResponse|TubeStats $stats */
$stats = $pheanstalk->statsTube($tube);
- $status[] = [
- 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(),
- 'queue' => $stats['name'],
- 'jobs in queue' => $stats['current-jobs-ready'],
- 'jobs running' => $stats['current-jobs-reserved'],
- 'jobs delayed' => $stats['current-jobs-delayed'],
-// 'jobs buried' => $stats['current-jobs-buried'],
- 'total jobs' => $stats['total-jobs'],
-// 'workers using' => $stats['current-using'],
- 'workers waiting' => $stats['current-waiting'],
- 'workers watching' => --$stats['current-watching'], // remove the monitoring
- ];
+ if ($stats instanceof TubeStats) {
+ // Pheanstalk 5
+ $status[] = [
+ 'host' => $host.':'.$port,
+ 'queue' => $stats->name->value,
+ 'jobs in queue' => $stats->currentJobsReady,
+ 'jobs running' => $stats->currentJobsReserved,
+ 'jobs delayed' => $stats->currentJobsDelayed,
+ // 'jobs buried' => $stats['current-jobs-buried'],
+ 'total jobs' => $stats->totalJobs,
+ // 'workers using' => $stats['current-using'],
+ 'workers waiting' => $stats->currentWaiting,
+ 'workers watching' => $stats->currentWatching - 1, // remove the monitoring
+ ];
+ } else {
+ $status[] = [
+ 'host' => $host.':'.$port,
+ 'queue' => $stats['name'],
+ 'jobs in queue' => $stats['current-jobs-ready'],
+ 'jobs running' => $stats['current-jobs-reserved'],
+ 'jobs delayed' => $stats['current-jobs-delayed'],
+ // 'jobs buried' => $stats['current-jobs-buried'],
+ 'total jobs' => $stats['total-jobs'],
+ // 'workers using' => $stats['current-using'],
+ 'workers waiting' => $stats['current-waiting'],
+ 'workers watching' => --$stats['current-watching'], // remove the monitoring
+ ];
+ }
} catch (Exception $e) {
// tube not found
}
@@ -232,13 +299,13 @@ private function queuesInfo($pheanstalk)
*
* @return array
*/
- private function workersInfo($pheanstalk)
+ private function workersInfo($pheanstalk, string $host, int $port): array
{
$jobs = [];
foreach ($pheanstalk->listTubes() as $tube) {
$job = [
- 'host' => $pheanstalk->getConnection()->getHost().':'.$pheanstalk->getConnection()->getPort(),
+ 'host' => $host.':'.$port,
'queue' => $tube,
'job ready id' => '',
'job ready data' => '',
diff --git a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php
index 10ffc44..6e7e93e 100644
--- a/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php
+++ b/tests/Connection/Pheanstalk/PheanstalkConnectionTest.php
@@ -5,10 +5,15 @@
use Bdf\Queue\Connection\Generic\GenericTopic;
use Bdf\Queue\Serializer\JsonSerializer;
use Pheanstalk\Connection;
-use Pheanstalk\PheanstalkInterface;
+use Pheanstalk\Contract\PheanstalkInterface;
+use Pheanstalk\Contract\PheanstalkSubscriberInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
+use function class_exists;
+use function interface_exists;
+use function method_exists;
+
/**
* @group Bdf_Queue
* @group Bdf_Queue_Connection
@@ -31,6 +36,11 @@ class PheanstalkConnectionTest extends TestCase
*/
public function setUp(): void
{
+ if (interface_exists(PheanstalkSubscriberInterface::class)) {
+ $this->markTestSkipped('Pheanstalk >= 5 is not supported');
+ }
+
+ class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined
$this->pheanstalk = $this->createMock(PheanstalkInterface::class);
$this->connection = new PheanstalkConnection('foo', new JsonSerializer());
@@ -70,9 +80,7 @@ public function test_set_get_pheanstalk()
*/
public function test_close()
{
- $connection = $this->createMock(Connection::class);
- $connection->expects($this->once())->method('disconnect');
- $this->pheanstalk->expects($this->once())->method('getConnection')->willReturn($connection);
+ $this->expectNotToPerformAssertions();
$this->connection->close();
// close once
diff --git a/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php
new file mode 100644
index 0000000..ee8edb6
--- /dev/null
+++ b/tests/Connection/Pheanstalk/PheanstalkFunctionalTest.php
@@ -0,0 +1,117 @@
+markTestSkipped('Pheanstalk server is not available');
+ }
+
+ $this->connection = new PheanstalkConnection('test', new JsonSerializer());
+ $this->connection->setConfig([
+ 'host' => $host,
+ 'port' => $port,
+ ]);
+
+ $this->queueName = 'test_queue_' . bin2hex(random_bytes(5));
+ }
+
+ public function test_queuePushPop()
+ {
+ $queue = $this->connection->queue();
+
+ $this->assertEquals(0, $queue->count($this->queueName));
+ $queue->push(
+ (new Message(['foo' => 'bar']))
+ ->setQueue($this->queueName)
+ );
+ $this->assertEquals(1, $queue->count($this->queueName));
+
+ $message = $queue->pop($this->queueName);
+ $this->assertEquals($this->queueName, $message->message()->queue());
+ $this->assertSame(['foo' => 'bar'], $message->message()->data());
+ $this->assertFalse($message->isRejected());
+ $this->assertFalse($message->isDeleted());
+
+ $message->acknowledge();
+
+ $this->assertEquals(0, $queue->count($this->queueName));
+ $this->assertNull($queue->pop($this->queueName, 1));
+ }
+
+ public function test_queueRelease()
+ {
+ $queue = $this->connection->queue();
+ $this->assertSame(0, $queue->count($this->queueName));
+ $queue->push(
+ (new Message(['foo' => 'bar']))
+ ->setQueue($this->queueName)
+ );
+ $this->assertSame(1, $queue->count($this->queueName));
+
+ $message = $queue->pop($this->queueName);
+ $this->assertSame(['foo' => 'bar'], $message->message()->data());
+ $this->assertFalse($message->isRejected());
+ $this->assertFalse($message->isDeleted());
+
+ $queue->release($message->message());
+ $this->assertSame(1, $queue->count($this->queueName));
+ $this->assertEquals($message, $queue->pop($this->queueName));
+ $this->assertFalse($message->isRejected());
+ $this->assertFalse($message->isDeleted());
+
+ $message->acknowledge();
+ }
+
+ public function test_queueStats()
+ {
+ $queue = $this->connection->queue();
+ $stats = $queue->stats();
+
+ $this->assertIsArray($stats);
+ $this->assertArrayHasKey('queues', $stats);
+ $this->assertArrayHasKey('workers', $stats);
+ }
+
+ public function test_topic()
+ {
+ $messages = [];
+
+ $topic = $this->connection->topic();
+ $topic->subscribe(['topic.test'], function (QueueEnvelope $envelope) use (&$messages) {
+ $messages[] = $envelope->message()->data();
+ $envelope->acknowledge();
+ });
+
+ if (pcntl_fork() === 0) {
+ usleep(50000);
+ $topic->publish(
+ (new Message(['foo' => 'bar']))
+ ->setTopic('topic.test')
+ );
+ exit;
+ }
+
+ $topic->consume(2);
+ $this->assertEquals([['foo' => 'bar']], $messages);
+ }
+}
diff --git a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php
index 28dd54e..f4ff786 100644
--- a/tests/Connection/Pheanstalk/PheanstalkQueueTest.php
+++ b/tests/Connection/Pheanstalk/PheanstalkQueueTest.php
@@ -8,13 +8,20 @@
use Bdf\Queue\Message\Message;
use Bdf\Queue\Message\QueuedMessage;
use Bdf\Queue\Serializer\JsonSerializer;
+use Pheanstalk\Contract\PheanstalkInterface;
+use Pheanstalk\Contract\PheanstalkSubscriberInterface;
+use Pheanstalk\Contract\ResponseInterface;
use Pheanstalk\Exception\SocketException;
use Pheanstalk\Job as PheanstalkJob;
use Pheanstalk\Pheanstalk;
-use Pheanstalk\PheanstalkInterface;
+use Pheanstalk\Response\ArrayResponse;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
+use function class_exists;
+use function interface_exists;
+use function method_exists;
+
/**
* @group Bdf_Queue
* @group Bdf_Queue_Connection
@@ -37,6 +44,11 @@ class PheanstalkQueueTest extends TestCase
*/
public function setUp(): void
{
+ if (interface_exists(PheanstalkSubscriberInterface::class)) {
+ $this->markTestSkipped('Pheanstalk >= 5 is not supported');
+ }
+
+ class_exists(PheanstalkConnection::class); // Autoload Pheanstalk classes to ensure that interface alias is defined
$this->pheanstalk = $this->createMock(PheanstalkInterface::class);
$connection = new PheanstalkConnection('foo', new JsonSerializer());
@@ -97,10 +109,10 @@ public function test_push_error($expected, $internal)
$this->driver->push($message);
}
- public function provideExceptions()
+ public static function provideExceptions()
{
return [
- [ConnectionLostException::class, new SocketException()],
+ [ConnectionLostException::class, class_exists(SocketException::class) ? new SocketException() : new \Pheanstalk\Exception\ConnectionException(1, '')],
[ServerException::class, new \Pheanstalk\Exception\ServerException()],
[ConnectionException::class, new \Pheanstalk\Exception\ClientException()],
];
@@ -139,7 +151,12 @@ public function test_pop()
$job->expects($this->once())->method('getData')->willReturn('{"data":"foo"}');
$this->pheanstalk->expects($this->once())->method('watchOnly')->with('queue')->willReturnSelf();
- $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job);
+
+ if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
+ $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->with(1)->willReturn($job);
+ } else {
+ $this->pheanstalk->expects($this->once())->method('reserve')->with(1)->willReturn($job);
+ }
$message = $this->driver->pop('queue', 1)->message();
@@ -156,7 +173,12 @@ public function test_pop()
public function test_pop_end_of_queue()
{
$this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf();
- $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null);
+
+ if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
+ $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willReturn(null);
+ } else {
+ $this->pheanstalk->expects($this->once())->method('reserve')->willReturn(null);
+ }
$this->assertSame(null, $this->driver->pop('queue', 1));
}
@@ -168,7 +190,11 @@ public function test_pop_error($expected, $internal)
{
$this->expectException($expected);
$this->pheanstalk->expects($this->once())->method('watchOnly')->willReturnSelf();
- $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal);
+ if (method_exists(Pheanstalk::class, 'reserveWithTimeout')) {
+ $this->pheanstalk->expects($this->once())->method('reserveWithTimeout')->willThrowException($internal);
+ } else {
+ $this->pheanstalk->expects($this->once())->method('reserve')->willThrowException($internal);
+ }
$this->driver->pop('queue', 1);
}
@@ -236,7 +262,7 @@ public function test_release_error($expected, $internal)
*/
public function test_count()
{
- $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(['current-jobs-ready' => 1]);
+ $this->pheanstalk->expects($this->once())->method('statsTube')->with('queue')->willReturn(new ArrayResponse('', ['current-jobs-ready' => 1]));
$this->assertSame(1, $this->driver->count('queue'));
}