From 41979e808fd1909317f2a9c6300c5cb86a7f0e4b Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:22:33 +0100 Subject: [PATCH 1/4] Add queue pause/resume --- src/CloudTasksApi.php | 2 + src/CloudTasksApiConcrete.php | 12 ++++++ src/CloudTasksApiContract.php | 4 ++ src/CloudTasksApiFake.php | 25 +++++++++++++ src/CloudTasksQueue.php | 14 +++++++ src/CloudTasksServiceProvider.php | 23 ++++++++++++ tests/CloudTasksApiTest.php | 62 +++++++++++++++++++++++++++++++ tests/PauseResumeQueueTest.php | 46 +++++++++++++++++++++++ 8 files changed, 188 insertions(+) create mode 100644 tests/PauseResumeQueueTest.php diff --git a/src/CloudTasksApi.php b/src/CloudTasksApi.php index 0b961fa..d25d666 100644 --- a/src/CloudTasksApi.php +++ b/src/CloudTasksApi.php @@ -12,6 +12,8 @@ * @method static void deleteTask(string $taskName) * @method static Task getTask(string $taskName) * @method static bool exists(string $taskName) + * @method static void pause(string $queue) + * @method static void resume(string $queue) */ class CloudTasksApi extends Facade { diff --git a/src/CloudTasksApiConcrete.php b/src/CloudTasksApiConcrete.php index b62c61e..639f71b 100644 --- a/src/CloudTasksApiConcrete.php +++ b/src/CloudTasksApiConcrete.php @@ -9,6 +9,8 @@ use Google\Cloud\Tasks\V2\GetTaskRequest; use Google\Cloud\Tasks\V2\CreateTaskRequest; use Google\Cloud\Tasks\V2\DeleteTaskRequest; +use Google\Cloud\Tasks\V2\PauseQueueRequest; +use Google\Cloud\Tasks\V2\ResumeQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; class CloudTasksApiConcrete implements CloudTasksApiContract @@ -65,4 +67,14 @@ public function exists(string $taskName): bool return false; } + + public function pause(string $queue): void + { + $this->client->pauseQueue(PauseQueueRequest::build($queue)); + } + + public function resume(string $queue): void + { + $this->client->resumeQueue(ResumeQueueRequest::build($queue)); + } } diff --git a/src/CloudTasksApiContract.php b/src/CloudTasksApiContract.php index 5f0af35..fdaa562 100644 --- a/src/CloudTasksApiContract.php +++ b/src/CloudTasksApiContract.php @@ -15,4 +15,8 @@ public function deleteTask(string $taskName): void; public function getTask(string $taskName): Task; public function exists(string $taskName): bool; + + public function pause(string $queue): void; + + public function resume(string $queue): void; } diff --git a/src/CloudTasksApiFake.php b/src/CloudTasksApiFake.php index 773fcdc..05b5cea 100644 --- a/src/CloudTasksApiFake.php +++ b/src/CloudTasksApiFake.php @@ -23,6 +23,11 @@ class CloudTasksApiFake implements CloudTasksApiContract */ public array $deletedTasks = []; + /** + * @var array + */ + public array $pausedQueues = []; + public function createTask(string $queueName, Task $task): Task { $this->createdTasks[] = compact('queueName', 'task'); @@ -51,6 +56,16 @@ public function exists(string $taskName): bool return false; } + public function pause(string $queue): void + { + $this->pausedQueues[$queue] = true; + } + + public function resume(string $queue): void + { + unset($this->pausedQueues[$queue]); + } + public function assertTaskDeleted(string $taskName): void { Assert::assertTrue( @@ -85,4 +100,14 @@ public function assertCreatedTaskCount(int $count): void { Assert::assertCount($count, $this->createdTasks); } + + public function assertQueuePaused(string $queue): void + { + Assert::assertTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to be paused, but is not'); + } + + public function assertQueueNotPaused(string $queue): void + { + Assert::assertNotTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to not be paused, but it is'); + } } diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index b23780b..11e1078 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -467,4 +467,18 @@ private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName) return $envVars; } + + public function pause(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::pause($queue); + } + + public function resume(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::resume($queue); + } } diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 8e4833a..8eaf1a4 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -7,8 +7,11 @@ use Illuminate\Routing\Router; use Illuminate\Events\Dispatcher; use Illuminate\Queue\QueueManager; +use Illuminate\Support\Facades\Queue; use Illuminate\Foundation\Application; use Illuminate\Queue\Events\JobFailed; +use Illuminate\Queue\Events\QueuePaused; +use Illuminate\Queue\Events\QueueResumed; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\Events\JobExceptionOccurred; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; @@ -113,6 +116,26 @@ private function registerEvents(): void return; } }); + + $events->listen(QueuePaused::class, function (QueuePaused $event) { + $queue = Queue::connection($event->connection); + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->pause($event->queue); + }); + + $events->listen(QueueResumed::class, function (QueueResumed $event) { + $queue = Queue::connection($event->connection); + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->resume($event->queue); + }); } private function registerCommands(): void diff --git a/tests/CloudTasksApiTest.php b/tests/CloudTasksApiTest.php index 5b42338..a92cec7 100644 --- a/tests/CloudTasksApiTest.php +++ b/tests/CloudTasksApiTest.php @@ -9,7 +9,9 @@ use Google\ApiCore\ApiException; use Google\Cloud\Tasks\V2\HttpMethod; use Google\Cloud\Tasks\V2\HttpRequest; +use Google\Cloud\Tasks\V2\Queue\State; use PHPUnit\Framework\Attributes\Test; +use Google\Cloud\Tasks\V2\GetQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi; @@ -138,4 +140,64 @@ public function test_delete_task() $this->expectExceptionMessage('NOT_FOUND'); CloudTasksApi::getTask($task->getName()); } + + #[Test] + public function it_can_pause_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::RUNNING); + + // Act + CloudTasksApi::pause($queueName); + + // Assert + $this->assertEquals(State::PAUSED, $this->getQueueState($queueName)); + } + + #[Test] + public function it_can_resume_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::PAUSED); + + // Act + CloudTasksApi::resume($queueName); + + // Assert + $this->assertEquals(State::RUNNING, $this->getQueueState($queueName)); + } + + private function getQueueState(string $queue): int + { + return $this->client->getQueue(GetQueueRequest::build($queue))->getState(); + } + + private function ensureQueueIs(string $queue, int $desiredState): void + { + $currentState = $this->getQueueState($queue); + + if ($currentState === $desiredState) { + return; + } + + if ($currentState === State::RUNNING && $desiredState === State::PAUSED) { + CloudTasksApi::pause($queue); + } + + if ($currentState === State::PAUSED && $desiredState === State::RUNNING) { + CloudTasksApi::resume($queue); + } + + $this->assertEquals($desiredState, $this->getQueueState($queue)); + } } diff --git a/tests/PauseResumeQueueTest.php b/tests/PauseResumeQueueTest.php new file mode 100644 index 0000000..52315fa --- /dev/null +++ b/tests/PauseResumeQueueTest.php @@ -0,0 +1,46 @@ +version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + // $this->artisan('queue:pause cloudtasks:barbequeue'); + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueuePaused('barbequeue'); + } + + #[Test] + public function queue_can_be_resumed(): void + { + // Arrange + if (app()->version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + Artisan::call('queue:continue my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueueNotPaused('barbequeue'); + } +} From 0e654d6947de0a8dc64db70bde9270bb785e865c Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:26:01 +0100 Subject: [PATCH 2/4] Fix event listening for Laravel 11 --- src/CloudTasksServiceProvider.php | 32 +++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 8eaf1a4..c285053 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -117,25 +117,29 @@ private function registerEvents(): void } }); - $events->listen(QueuePaused::class, function (QueuePaused $event) { - $queue = Queue::connection($event->connection); + if (class_exists('Illuminate\Queue\Events\QueuePaused')) { + $events->listen(QueuePaused::class, function (QueuePaused $event) { + $queue = Queue::connection($event->connection); - if (! $queue instanceof CloudTasksQueue) { - return; - } + if (! $queue instanceof CloudTasksQueue) { + return; + } - $queue->pause($event->queue); - }); + $queue->pause($event->queue); + }); + } - $events->listen(QueueResumed::class, function (QueueResumed $event) { - $queue = Queue::connection($event->connection); + if (class_exists('Illuminate\Queue\Events\QueueResumed')) { + $events->listen(QueueResumed::class, function (QueueResumed $event) { + $queue = Queue::connection($event->connection); - if (! $queue instanceof CloudTasksQueue) { - return; - } + if (! $queue instanceof CloudTasksQueue) { + return; + } - $queue->resume($event->queue); - }); + $queue->resume($event->queue); + }); + } } private function registerCommands(): void From c9ddaaf0725511bbf808be773a293d021e5d9f0d Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:34:15 +0100 Subject: [PATCH 3/4] Add PHPStan ignores --- src/CloudTasksServiceProvider.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index c285053..01e79d0 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -118,26 +118,26 @@ private function registerEvents(): void }); if (class_exists('Illuminate\Queue\Events\QueuePaused')) { - $events->listen(QueuePaused::class, function (QueuePaused $event) { - $queue = Queue::connection($event->connection); + $events->listen(QueuePaused::class, function (QueuePaused $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line if (! $queue instanceof CloudTasksQueue) { return; } - $queue->pause($event->queue); + $queue->pause($event->queue); // @phpstan-ignore-line }); } if (class_exists('Illuminate\Queue\Events\QueueResumed')) { - $events->listen(QueueResumed::class, function (QueueResumed $event) { - $queue = Queue::connection($event->connection); + $events->listen(QueueResumed::class, function (QueueResumed $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line if (! $queue instanceof CloudTasksQueue) { return; } - $queue->resume($event->queue); + $queue->resume($event->queue); // @phpstan-ignore-line }); } } From fbfc76c9569ba2772a33b38b666680329eb91da2 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 15:14:59 +0100 Subject: [PATCH 4/4] Use matrix queue for tests --- .github/workflows/run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 4e085ea..01f7873 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -119,7 +119,7 @@ jobs: DB_DRIVER: ${{ matrix.db.driver }} DB_HOST: 127.0.0.1 CI_CLOUD_TASKS_PROJECT_ID: ${{ secrets.CI_CLOUD_TASKS_PROJECT_ID }} - CI_CLOUD_TASKS_QUEUE: ${{ secrets.CI_CLOUD_TASKS_QUEUE }} + CI_CLOUD_TASKS_QUEUE: ${{ matrix.payload.queue }} CI_CLOUD_TASKS_LOCATION: ${{ secrets.CI_CLOUD_TASKS_LOCATION }} CI_CLOUD_TASKS_SERVICE_ACCOUNT_EMAIL: ${{ secrets.CI_CLOUD_TASKS_SERVICE_ACCOUNT_EMAIL }} CI_SERVICE_ACCOUNT_JSON_KEY: ${{ secrets.CI_SERVICE_ACCOUNT_JSON_KEY }}