diff --git a/README.md b/README.md index 2e416b4..8d65d30 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,24 @@ php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000 - Session read, write, update, and expiry behavior are covered in the test suite against a real Surreal runtime. - This driver intentionally follows Laravel's normal database-session lifecycle, so expiry cleanup still relies on Laravel's standard session lottery / pruning behavior instead of Surreal-native TTL features. +### Surreal-Backed Queues + +Katra now also exposes a first-class `surreal` Laravel queue connection for teams that want jobs to live in SurrealDB alongside the rest of the application state. + +- Set `QUEUE_CONNECTION=surreal` to use the Surreal-backed queue connector. +- The connector defaults to the `surreal` database connection, but you can override the queue connection, table, queue name, and retry window with `SURREAL_QUEUE_CONNECTION`, `SURREAL_QUEUE_TABLE`, `SURREAL_QUEUE`, and `SURREAL_QUEUE_RETRY_AFTER`. +- If you also want failed job records in SurrealDB, set `QUEUE_FAILED_DRIVER=database-uuids` and `QUEUE_FAILED_DATABASE=surreal`. +- Make sure the queue tables exist on the Surreal connection before you start a worker. Katra's queue migration also creates `job_batches` and `failed_jobs` alongside `jobs`: + +```bash +php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000002_create_jobs_table.php +``` + +- Start a worker against the Surreal connection with `php artisan queue:work surreal`. +- Queue enqueue, reserve, complete, retry, and failed-job behavior are covered in the test suite against a real Surreal runtime. +- This connector currently uses optimistic job reservation instead of SQL row locks and database transactions, so it is best suited to Katra's current low-contention worker model rather than high-volume multi-worker contention scenarios. +- `after_commit` is not supported on the Surreal queue connection because the Surreal database connection does not expose SQL transaction semantics. + ## Planning Docs - [Katra v2 Overview](docs/v2-overview.md) diff --git a/app/Providers/AppServiceProvider.php b/app/Providers/AppServiceProvider.php index c3a6f82..db48dd1 100644 --- a/app/Providers/AppServiceProvider.php +++ b/app/Providers/AppServiceProvider.php @@ -3,12 +3,14 @@ namespace App\Providers; use App\Services\Surreal\Migrations\SurrealMigrationRepository; +use App\Services\Surreal\Queue\SurrealQueueConnector; use App\Services\Surreal\Schema\SurrealSchemaConnection; use App\Services\Surreal\SurrealConnection; use App\Services\Surreal\SurrealDocumentStore; use App\Services\Surreal\SurrealHttpClient; use App\Services\Surreal\SurrealRuntimeManager; use Illuminate\Database\DatabaseManager; +use Illuminate\Queue\QueueManager; use Illuminate\Session\DatabaseSessionHandler; use Illuminate\Support\ServiceProvider; @@ -50,5 +52,11 @@ public function boot(): void $app, ); }); + + $this->app->afterResolving('queue', function (QueueManager $manager): void { + $manager->addConnector('surreal', function (): SurrealQueueConnector { + return new SurrealQueueConnector($this->app['db']); + }); + }); } } diff --git a/app/Services/Surreal/Queue/SurrealQueue.php b/app/Services/Surreal/Queue/SurrealQueue.php new file mode 100644 index 0000000..3379b0d --- /dev/null +++ b/app/Services/Surreal/Queue/SurrealQueue.php @@ -0,0 +1,112 @@ +getQueue($queue); + + foreach ($this->nextAvailableJobs($queue) as $jobRecord) { + $reservedJob = $this->attemptToReserve($jobRecord); + + if ($reservedJob !== null) { + return new DatabaseJob( + $this->container, + $this, + $reservedJob, + $this->connectionName, + $queue, + ); + } + } + + return null; + } + + public function deleteReserved($queue, $id): void + { + $this->database->table($this->table) + ->where('id', $id) + ->delete(); + } + + public function deleteAndRelease($queue, $job, $delay): void + { + $this->deleteReserved($queue, $job->getJobId()); + + $this->release($queue, $job->getJobRecord(), $delay); + } + + /** + * @return list + */ + private function nextAvailableJobs(string $queue): array + { + $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp(); + + return $this->database->table($this->table) + ->where('queue', $queue) + ->where(function ($query) use ($expiration): void { + $query->where(function ($query): void { + $query->whereNull('reserved_at') + ->where('available_at', '<=', $this->currentTime()); + })->orWhere(function ($query) use ($expiration): void { + $query->whereNotNull('reserved_at') + ->where('reserved_at', '<=', $expiration); + }); + }) + ->orderBy('id', 'asc') + ->limit(5) + ->get() + ->map(static function (stdClass $record): DatabaseJobRecord { + $record->reserved_at ??= null; + + return new DatabaseJobRecord($record); + }) + ->all(); + } + + private function attemptToReserve(DatabaseJobRecord $job): ?DatabaseJobRecord + { + $reservedAt = $this->currentTime(); + $attempts = $job->attempts + 1; + $existingReservedAt = $job->reserved_at ?? null; + + $query = $this->database->table($this->table) + ->where('id', $job->id); + + if ($existingReservedAt === null) { + $query->whereNull('reserved_at') + ->where('available_at', '<=', $reservedAt); + } else { + $query->where('reserved_at', $existingReservedAt); + } + + $updated = $query->update([ + 'reserved_at' => $reservedAt, + 'attempts' => $attempts, + ]); + + if ($updated !== 1) { + return null; + } + + return new DatabaseJobRecord((object) [ + 'id' => $job->id, + 'queue' => $job->queue, + 'payload' => $job->payload, + 'attempts' => $attempts, + 'reserved_at' => $reservedAt, + 'available_at' => $job->available_at, + 'created_at' => $job->created_at, + ]); + } +} diff --git a/app/Services/Surreal/Queue/SurrealQueueConnector.php b/app/Services/Surreal/Queue/SurrealQueueConnector.php new file mode 100644 index 0000000..64c136a --- /dev/null +++ b/app/Services/Surreal/Queue/SurrealQueueConnector.php @@ -0,0 +1,24 @@ +connections->connection($config['connection'] ?? 'surreal'), + $config['table'], + $config['queue'], + $config['retry_after'] ?? 60, + $config['after_commit'] ?? false, + ); + } +} diff --git a/config/queue.php b/config/queue.php index 79c2c0a..bab65c0 100644 --- a/config/queue.php +++ b/config/queue.php @@ -24,7 +24,7 @@ | used by your application. An example configuration is provided for | each backend supported by Laravel. You're also free to add more. | - | Drivers: "sync", "database", "beanstalkd", "sqs", "redis", + | Drivers: "sync", "database", "surreal", "beanstalkd", "sqs", "redis", | "deferred", "background", "failover", "null" | */ @@ -44,6 +44,15 @@ 'after_commit' => false, ], + 'surreal' => [ + 'driver' => 'surreal', + 'connection' => env('SURREAL_QUEUE_CONNECTION', 'surreal'), + 'table' => env('SURREAL_QUEUE_TABLE', env('DB_QUEUE_TABLE', 'jobs')), + 'queue' => env('SURREAL_QUEUE', env('DB_QUEUE', 'default')), + 'retry_after' => (int) env('SURREAL_QUEUE_RETRY_AFTER', env('DB_QUEUE_RETRY_AFTER', 90)), + 'after_commit' => false, + ], + 'beanstalkd' => [ 'driver' => 'beanstalkd', 'host' => env('BEANSTALKD_QUEUE_HOST', 'localhost'), @@ -122,7 +131,7 @@ 'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'), - 'database' => env('DB_CONNECTION', 'sqlite'), + 'database' => env('QUEUE_FAILED_DATABASE', env('DB_CONNECTION', 'sqlite')), 'table' => 'failed_jobs', ], diff --git a/tests/Feature/SurrealQueueDriverTest.php b/tests/Feature/SurrealQueueDriverTest.php new file mode 100644 index 0000000..1c82814 --- /dev/null +++ b/tests/Feature/SurrealQueueDriverTest.php @@ -0,0 +1,247 @@ +isAvailable()) { + $this->markTestSkipped('The `surreal` CLI is not available in this environment.'); + } + + $storagePath = storage_path('app/surrealdb/queue-driver-test-'.Str::uuid()); + $completionMarker = storage_path('app/queue-driver-test-complete-'.Str::uuid().'.txt'); + $retryMarker = storage_path('app/queue-driver-test-retry-'.Str::uuid().'.txt'); + $originalDefaultConnection = config('database.default'); + $originalMigrationConnection = config('database.migrations.connection'); + $originalQueueDefault = config('queue.default'); + $originalSurrealQueueConnection = config('queue.connections.surreal'); + $originalFailedDriver = config('queue.failed.driver'); + $originalFailedDatabase = config('queue.failed.database'); + $originalFailedTable = config('queue.failed.table'); + + File::deleteDirectory($storagePath); + File::delete($completionMarker); + File::delete($retryMarker); + File::ensureDirectoryExists(dirname($storagePath)); + File::ensureDirectoryExists(dirname($completionMarker)); + + try { + $server = retryStartingSurrealQueueServer($client, $storagePath); + + config()->set('database.default', 'sqlite'); + config()->set('database.migrations.connection', null); + config()->set('surreal.host', '127.0.0.1'); + config()->set('surreal.port', $server['port']); + config()->set('surreal.endpoint', $server['endpoint']); + config()->set('surreal.username', 'root'); + config()->set('surreal.password', 'root'); + config()->set('surreal.namespace', 'katra'); + config()->set('surreal.database', 'queue_driver_test'); + config()->set('surreal.storage_engine', 'surrealkv'); + config()->set('surreal.storage_path', $storagePath); + config()->set('surreal.runtime', 'local'); + config()->set('surreal.autostart', false); + config()->set('queue.default', 'surreal'); + config()->set('queue.connections.surreal', [ + 'driver' => 'surreal', + 'table' => 'jobs', + 'queue' => 'default', + 'retry_after' => 60, + 'after_commit' => false, + ]); + config()->set('queue.failed.driver', 'database-uuids'); + config()->set('queue.failed.database', 'surreal'); + config()->set('queue.failed.table', 'failed_jobs'); + + resetSurrealQueueState(); + + expect(Artisan::call('migrate', [ + '--database' => 'surreal', + '--force' => true, + '--realpath' => true, + '--path' => database_path('migrations/0001_01_01_000002_create_jobs_table.php'), + ]))->toBe(0); + + $queue = app('queue')->connection('surreal'); + + expect($queue)->toBeInstanceOf(SurrealQueue::class); + + $queue->push(new SurrealQueueCompletingTestJob($completionMarker, 'completed')); + + $reservedJob = $queue->pop(); + + expect($reservedJob)->not->toBeNull() + ->and(DB::connection('surreal')->table('jobs')->where('id', $reservedJob->getJobId())->value('reserved_at'))->not->toBeNull(); + + $reservedJob->fire(); + $reservedJob->delete(); + + expect(File::get($completionMarker))->toBe('completed') + ->and(DB::connection('surreal')->table('jobs')->count())->toBe(0); + + $queue->push(new SurrealQueueCompletingTestJob($retryMarker, 'retried')); + + $releasedJob = $queue->pop(); + $releasedJob->release(0); + + $releasedRecord = DB::connection('surreal')->table('jobs')->orderBy('id', 'desc')->first(); + $releasedRecordData = (array) $releasedRecord; + + expect($releasedRecord)->not->toBeNull() + ->and(array_key_exists('reserved_at', $releasedRecordData))->toBeFalse() + ->and((int) $releasedRecord->attempts)->toBe(1); + + $retriedJob = $queue->pop(); + + expect($retriedJob)->not->toBeNull() + ->and($retriedJob->attempts())->toBe(2); + + $retriedJob->fire(); + $retriedJob->delete(); + + expect(File::get($retryMarker))->toBe('retried') + ->and(DB::connection('surreal')->table('jobs')->count())->toBe(0); + + $queue->push(new SurrealQueueCompletingTestJob($retryMarker, 'expired-reservation')); + + $expiredReservationJob = $queue->pop(); + + expect($expiredReservationJob)->not->toBeNull(); + + expect(DB::connection('surreal')->table('jobs')->where('id', $expiredReservationJob->getJobId())->update([ + 'reserved_at' => now()->subMinutes(5)->timestamp, + ]))->toBe(1); + + $expiredReservationRetry = $queue->pop(); + + expect($expiredReservationRetry)->not->toBeNull() + ->and($expiredReservationRetry->getJobId())->toBe($expiredReservationJob->getJobId()) + ->and($expiredReservationRetry->attempts())->toBe(2); + + $expiredReservationRetry->fire(); + $expiredReservationRetry->delete(); + + expect(File::get($retryMarker))->toBe('expired-reservation') + ->and(DB::connection('surreal')->table('jobs')->count())->toBe(0); + + $queue->push(new SurrealQueueFailingTestJob); + + Artisan::call('queue:work', [ + 'connection' => 'surreal', + '--once' => true, + '--tries' => 1, + ]); + + $failedJob = DB::connection('surreal')->table('failed_jobs')->first(); + + expect($failedJob)->not->toBeNull() + ->and($failedJob->connection)->toBe('surreal') + ->and($failedJob->queue)->toBe('default') + ->and($failedJob->exception)->toContain('Surreal queue failure.') + ->and(DB::connection('surreal')->table('jobs')->count())->toBe(0); + } finally { + config()->set('database.default', $originalDefaultConnection); + config()->set('database.migrations.connection', $originalMigrationConnection); + config()->set('queue.default', $originalQueueDefault); + config()->set('queue.connections.surreal', $originalSurrealQueueConnection); + config()->set('queue.failed.driver', $originalFailedDriver); + config()->set('queue.failed.database', $originalFailedDatabase); + config()->set('queue.failed.table', $originalFailedTable); + + resetSurrealQueueState(); + + if (isset($server['process'])) { + $server['process']->stop(1); + } + + File::deleteDirectory($storagePath); + File::delete($completionMarker); + File::delete($retryMarker); + } +}); + +function resetSurrealQueueState(): void +{ + app()->forgetInstance(SurrealConnection::class); + app()->forgetInstance(SurrealRuntimeManager::class); + DB::purge('surreal'); + app()->forgetInstance('queue'); + app()->forgetInstance('queue.connection'); + app()->forgetInstance('queue.failer'); + app()->forgetInstance('migration.repository'); + app()->forgetInstance('migrator'); +} + +/** + * @return array{endpoint: string, port: int, process: Process} + */ +function retryStartingSurrealQueueServer(SurrealCliClient $client, string $storagePath, int $attempts = 3): array +{ + $httpClient = app(SurrealHttpClient::class); + + for ($attempt = 1; $attempt <= $attempts; $attempt++) { + $port = random_int(10240, 65535); + $endpoint = sprintf('ws://127.0.0.1:%d', $port); + $process = $client->startLocalServer( + bindAddress: sprintf('127.0.0.1:%d', $port), + datastorePath: $storagePath, + username: 'root', + password: 'root', + storageEngine: 'surrealkv', + ); + + if ($httpClient->waitUntilReady($endpoint)) { + return [ + 'endpoint' => $endpoint, + 'port' => $port, + 'process' => $process, + ]; + } + + $process->stop(1); + } + + throw new RuntimeException('Unable to start the SurrealDB queue test runtime.'); +} + +class SurrealQueueCompletingTestJob implements ShouldQueue +{ + use Dispatchable; + use InteractsWithQueue; + use Queueable; + + public function __construct( + public string $path, + public string $message, + ) {} + + public function handle(): void + { + File::put($this->path, $this->message); + } +} + +class SurrealQueueFailingTestJob implements ShouldQueue +{ + use Dispatchable; + use Queueable; + + public function handle(): void + { + throw new RuntimeException('Surreal queue failure.'); + } +}