Skip to content
Merged
36 changes: 36 additions & 0 deletions .github/workflows/php.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ jobs:
- name: Run test suite
run: composer run-script tests

pheanstalk_compatibility:
runs-on: ubuntu-latest
strategy:
matrix:
pheanstalk-versions: ['4.0', '5.0', '6.0', '7.0', '8.0']
name: Pheanstalk ${{ matrix.pheanstalk-versions }}

steps:
- uses: actions/checkout@v2

- name: Set Timezone
uses: szenius/set-timezone@v1.0
with:
timezoneLinux: "Europe/Paris"

- name: Install PHP
uses: shivammathur/setup-php@v2
with:
php-version: 8.4
extensions: json
ini-values: date.timezone=Europe/Paris

- name: Install & run Beanstalkd
run: |
sudo apt-get install -y beanstalkd
sudo systemctl start beanstalkd.service

- name: Install dependencies
run: composer install --prefer-dist --no-progress

- name: Require Pheanstalk version
run: composer require --dev "pda/pheanstalk:~${{ matrix.pheanstalk-versions }}"

- name: Run test suite
run: composer run-script tests

analysis:
name: Analysis
runs-on: ubuntu-latest
Expand Down
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"enqueue/fs": "~0.9",
"league/container": "~3.0",
"monolog/monolog": "~2.0",
"pda/pheanstalk": "^3.1@dev",
"pda/pheanstalk": "~4.0|~5.0|~6.0|~7.0|~8.0",
"php-amqplib/php-amqplib": "~3.0",
"phpbench/phpbench": "~0.0|~1.0",
"phpunit/phpunit": "~9.6",
Expand All @@ -60,7 +60,8 @@
"symfony/var-dumper": "VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)"
},
"conflict": {
"enqueue/dsn": "0.10.25"
"enqueue/dsn": "0.10.25",
"pda/pheanstalk": "<4.0"
},
"scripts": {
"tests": "phpunit",
Expand Down
3 changes: 3 additions & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
<directory name="src" />
<ignoreFiles>
<directory name="vendor" />

<!-- Too much hack to get pheanstalk v3-5 works, so disable psalm until legacy is removed -->
<directory name="src/Connection/Pheanstalk" />
</ignoreFiles>
</projectFiles>

Expand Down
46 changes: 38 additions & 8 deletions src/Connection/Pheanstalk/PheanstalkConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@
use Bdf\Queue\Message\MessageSerializationTrait;
use Bdf\Queue\Serializer\SerializerInterface;
use Bdf\Queue\Util\MultiServer;
use Pheanstalk\Connection;
use Pheanstalk\Contract\PheanstalkManagerInterface;
use Pheanstalk\Pheanstalk;
use Pheanstalk\PheanstalkInterface;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Values\Timeout;

use function class_alias;
use function class_exists;
use function fclose;
use function fsockopen;
use function interface_exists;
use function method_exists;

if (!interface_exists(PheanstalkInterface::class)) {
// Support for Pheanstalk 5
class_alias(PheanstalkManagerInterface::class, PheanstalkInterface::class);
}

/**
* PheanstalkConnection
Expand All @@ -23,6 +36,9 @@ class PheanstalkConnection implements ConnectionDriverInterface
use ConnectionNamed;
use MessageSerializationTrait;

public const DEFAULT_PORT = 11300;
public const DEFAULT_TTR = 60; // 1 minute

/**
* @var PheanstalkInterface
*/
Expand Down Expand Up @@ -50,8 +66,8 @@ public function __construct(string $name, SerializerInterface $serializer)
*/
public function setConfig(array $config): void
{
$this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', PheanstalkInterface::DEFAULT_PORT) + [
'ttr' => PheanstalkInterface::DEFAULT_TTR,
$this->config = MultiServer::prepareMultiServers($config, '127.0.0.1', self::DEFAULT_PORT) + [
'ttr' => self::DEFAULT_TTR,
'client-timeout' => null,
];
}
Expand All @@ -78,13 +94,20 @@ public function timeToRun(): ?int
* @return PheanstalkInterface
* @throws ServerNotAvailableException If no servers has been found
*/
public function pheanstalk(): PheanstalkInterface
public function pheanstalk()
{
if ($this->pheanstalk === null) {
// Set the first available server
// Pheanstalk manage a lazy connection. We can instantiate the client here.
foreach ($this->getActiveHost() as $host => $port) {
$this->pheanstalk = new Pheanstalk($host, $port, $this->config['client-timeout']);
$timeout = (int) ($this->config['client-timeout'] ?? 10);

if (class_exists(Timeout::class)) {
// Pheanstalk 5
$timeout = new Timeout($timeout);
}

$this->pheanstalk = Pheanstalk::create($host, (int) $port, $timeout);
break;
}
}
Expand All @@ -108,7 +131,11 @@ public function setPheanstalk(PheanstalkInterface $pheanstalk)
public function close(): void
{
if ($this->pheanstalk !== null) {
$this->pheanstalk->getConnection()->disconnect();
if (method_exists($this->pheanstalk, 'disconnect')) {
// Pheanstalk 7
$this->pheanstalk->disconnect();
}

$this->pheanstalk = null;
}
}
Expand Down Expand Up @@ -143,7 +170,10 @@ public function getActiveHost()
$valid = [];

foreach ($this->config['hosts'] as $host => $port) {
if ((new Connection($host, $port))->isServiceListening()) {
$stream = @fsockopen($host, $port, $errno, $errstr, 0.1);

if ($stream !== false) {
fclose($stream);
$valid[$host] = $port;
}
}
Expand Down
Loading