Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000
- Session read, write, update, and expiry behavior are covered in the test suite against a real Surreal runtime.
- This driver intentionally follows Laravel's normal database-session lifecycle, so expiry cleanup still relies on Laravel's standard session lottery / pruning behavior instead of Surreal-native TTL features.

### Surreal-Backed Queues

Katra now also exposes a first-class `surreal` Laravel queue connection for teams that want jobs to live in SurrealDB alongside the rest of the application state.

- Set `QUEUE_CONNECTION=surreal` to use the Surreal-backed queue connector.
- The connector defaults to the `surreal` database connection, but you can override the queue connection, table, queue name, and retry window with `SURREAL_QUEUE_CONNECTION`, `SURREAL_QUEUE_TABLE`, `SURREAL_QUEUE`, and `SURREAL_QUEUE_RETRY_AFTER`.
- If you also want failed job records in SurrealDB, set `QUEUE_FAILED_DRIVER=database-uuids` and `QUEUE_FAILED_DATABASE=surreal`.
- Make sure the queue tables exist on the Surreal connection before you start a worker. Katra's queue migration also creates `job_batches` and `failed_jobs` alongside `jobs`:

```bash
php artisan migrate --database=surreal --path=database/migrations/0001_01_01_000002_create_jobs_table.php
```

- Start a worker against the Surreal connection with `php artisan queue:work surreal`.
- Queue enqueue, reserve, complete, retry, and failed-job behavior are covered in the test suite against a real Surreal runtime.
- This connector currently uses optimistic job reservation instead of SQL row locks and database transactions, so it is best suited to Katra's current low-contention worker model rather than high-volume multi-worker contention scenarios.
- `after_commit` is not supported on the Surreal queue connection because the Surreal database connection does not expose SQL transaction semantics.

## Planning Docs

- [Katra v2 Overview](docs/v2-overview.md)
Expand Down
8 changes: 8 additions & 0 deletions app/Providers/AppServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
namespace App\Providers;

use App\Services\Surreal\Migrations\SurrealMigrationRepository;
use App\Services\Surreal\Queue\SurrealQueueConnector;
use App\Services\Surreal\Schema\SurrealSchemaConnection;
use App\Services\Surreal\SurrealConnection;
use App\Services\Surreal\SurrealDocumentStore;
use App\Services\Surreal\SurrealHttpClient;
use App\Services\Surreal\SurrealRuntimeManager;
use Illuminate\Database\DatabaseManager;
use Illuminate\Queue\QueueManager;
use Illuminate\Session\DatabaseSessionHandler;
use Illuminate\Support\ServiceProvider;

Expand Down Expand Up @@ -50,5 +52,11 @@ public function boot(): void
$app,
);
});

$this->app->afterResolving('queue', function (QueueManager $manager): void {
$manager->addConnector('surreal', function (): SurrealQueueConnector {
return new SurrealQueueConnector($this->app['db']);
});
});
}
}
112 changes: 112 additions & 0 deletions app/Services/Surreal/Queue/SurrealQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

namespace App\Services\Surreal\Queue;

use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Support\Carbon;
use stdClass;

class SurrealQueue extends DatabaseQueue
{
public function pop($queue = null): ?DatabaseJob
{
$queue = $this->getQueue($queue);

foreach ($this->nextAvailableJobs($queue) as $jobRecord) {
$reservedJob = $this->attemptToReserve($jobRecord);

if ($reservedJob !== null) {
return new DatabaseJob(
$this->container,
$this,
$reservedJob,
$this->connectionName,
$queue,
);
}
}

return null;
}

public function deleteReserved($queue, $id): void
{
$this->database->table($this->table)
->where('id', $id)
->delete();
}

public function deleteAndRelease($queue, $job, $delay): void
{
$this->deleteReserved($queue, $job->getJobId());

$this->release($queue, $job->getJobRecord(), $delay);
}

/**
* @return list<DatabaseJobRecord>
*/
private function nextAvailableJobs(string $queue): array
{
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

return $this->database->table($this->table)
->where('queue', $queue)
->where(function ($query) use ($expiration): void {
$query->where(function ($query): void {
$query->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime());
})->orWhere(function ($query) use ($expiration): void {
$query->whereNotNull('reserved_at')
->where('reserved_at', '<=', $expiration);
});
})
->orderBy('id', 'asc')
->limit(5)
->get()
->map(static function (stdClass $record): DatabaseJobRecord {
$record->reserved_at ??= null;

return new DatabaseJobRecord($record);
})
->all();
}

private function attemptToReserve(DatabaseJobRecord $job): ?DatabaseJobRecord
{
$reservedAt = $this->currentTime();
$attempts = $job->attempts + 1;
$existingReservedAt = $job->reserved_at ?? null;

$query = $this->database->table($this->table)
->where('id', $job->id);

if ($existingReservedAt === null) {
$query->whereNull('reserved_at')
->where('available_at', '<=', $reservedAt);
} else {
$query->where('reserved_at', $existingReservedAt);
}

$updated = $query->update([
'reserved_at' => $reservedAt,
'attempts' => $attempts,
]);

if ($updated !== 1) {
return null;
}

return new DatabaseJobRecord((object) [
'id' => $job->id,
'queue' => $job->queue,
'payload' => $job->payload,
'attempts' => $attempts,
'reserved_at' => $reservedAt,
'available_at' => $job->available_at,
'created_at' => $job->created_at,
]);
}
}
24 changes: 24 additions & 0 deletions app/Services/Surreal/Queue/SurrealQueueConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace App\Services\Surreal\Queue;

use Illuminate\Database\ConnectionResolverInterface;
use Illuminate\Queue\Connectors\ConnectorInterface;

class SurrealQueueConnector implements ConnectorInterface
{
public function __construct(
private readonly ConnectionResolverInterface $connections,
) {}

public function connect(array $config): SurrealQueue
{
return new SurrealQueue(
$this->connections->connection($config['connection'] ?? 'surreal'),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60,
$config['after_commit'] ?? false,
);
}
}
13 changes: 11 additions & 2 deletions config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
| used by your application. An example configuration is provided for
| each backend supported by Laravel. You're also free to add more.
|
| Drivers: "sync", "database", "beanstalkd", "sqs", "redis",
| Drivers: "sync", "database", "surreal", "beanstalkd", "sqs", "redis",
| "deferred", "background", "failover", "null"
|
*/
Expand All @@ -44,6 +44,15 @@
'after_commit' => false,
],

'surreal' => [
'driver' => 'surreal',
'connection' => env('SURREAL_QUEUE_CONNECTION', 'surreal'),
'table' => env('SURREAL_QUEUE_TABLE', env('DB_QUEUE_TABLE', 'jobs')),
'queue' => env('SURREAL_QUEUE', env('DB_QUEUE', 'default')),
'retry_after' => (int) env('SURREAL_QUEUE_RETRY_AFTER', env('DB_QUEUE_RETRY_AFTER', 90)),
'after_commit' => false,
],

'beanstalkd' => [
'driver' => 'beanstalkd',
'host' => env('BEANSTALKD_QUEUE_HOST', 'localhost'),
Expand Down Expand Up @@ -122,7 +131,7 @@

'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
'database' => env('DB_CONNECTION', 'sqlite'),
'database' => env('QUEUE_FAILED_DATABASE', env('DB_CONNECTION', 'sqlite')),
'table' => 'failed_jobs',
],

Expand Down
Loading
Loading