Provides async queues implementation for Symfony (using mongodb as main storage).
sfcod_queue:
connections:
default: { driver: 'mongo-thread', collection: 'queue_jobs', queue: 'default', expire: 60, limit: 2 }
namespaces:
- 'App\Job' OR instead of namespaces you can use instanceof or direct service autowiring
services:
# _instanceof:
# SfCod\QueueBundle\Base\JobInterface:
# tags: ['sfcod.jobqueue.job']
App\Job\:
resource: '../src/Job/*'
tags: ['sfcod.jobqueue.job']namespaces - is not required. You can set here all namespace where your job classes are, otherwise all services with tag 'sfcod.jobqueue.job' will be fetched from symfony container. connection - is not required. If it is not set, bundle will use this service SfCod\QueueBundle\Base\MongoDriverInterface::class as default.
Create your own handler which implements SfCod\QueueBundle\Base\JobInterface
$jobQueue->push(your_job_handler_service, $data);$data - additional data for your job
Run worker daemon with console command:
$ php bin/console job-queue:work
$ php bin/console job-queue:retry --id=<Job ID>
$ php bin/console job-queue:run-job <Job ID>Where:
- work - command to run daemon in loop;
- retry - command to move all failed jobs back into queue, can be used with --id param to retry only single job
- run-job - command to run single job by id
'job_queue_worker.raise_before_job': SfCod\QueueBundle\Event\JobProcessingEvent;
'job_queue_worker.raise_after_job': SfCod\QueueBundle\Event\JobProcessedEvent;
'job_queue_worker.raise_exception_occurred_job': SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
'job_queue_worker.raise_failed_job': SfCod\QueueBundle\Event\JobFailedEvent;
'job_queue_worker.stop': SfCod\QueueBundle\Event\WorkerStoppingEvent;SfCod\QueueBundle\Service\JobQueue:
public: true
arguments:
- '@SfCod\QueueBundle\Service\QueueManager'SfCod\QueueBundle\Service\JobQueue: main job queue service.
SfCod\QueueBundle\Service\QueueManager:
calls:
- [addConnector, ['mongo-thread', '@SfCod\QueueBundle\Connector\ConnectorInterface']]SfCod\QueueBundle\Service\QueueManager: queue manager which holds connectors and connections.
SfCod\QueueBundle\Worker\Worker:
arguments:
- '@SfCod\QueueBundle\Service\QueueManager'
- '@SfCod\QueueBundle\Service\JobProcess'
- '@SfCod\QueueBundle\Failer\FailedJobProviderInterface'
- '@SfCod\QueueBundle\Handler\ExceptionHandlerInterface'
- '@Symfony\Component\EventDispatcher\EventDispatcherInterface'SfCod\QueueBundle\Worker\Worker: main worker service.
SfCod\QueueBundle\Service\JobProcess:
arguments:
- 'console'
- '%kernel.project_dir%/bin'
- 'php'
- ''SfCod\QueueBundle\Service\JobProcess: default config for jobs command processor in async queues, where:
- 'console' - name of console command
- '%kernel.project_dir%/bin' - path for console command
- 'php' - binary script
- '' - binary script arguments
SfCod\QueueBundle\Connector\ConnectorInterface:
class: SfCod\QueueBundle\Connector\MongoConnector
arguments:
- '@SfCod\QueueBundle\Base\JobResolverInterface'
- '@SfCod\QueueBundle\Base\MongoDriverInterface'SfCod\QueueBundle\Connector\ConnectorInterface: connector for queues' database.
SfCod\QueueBundle\Base\JobResolverInterface:
class: SfCod\QueueBundle\Service\JobResolver
arguments:
- '@Symfony\Component\DependencyInjection\ContainerInterface'SfCod\QueueBundle\Base\JobResolverInterface: resolver for jobs, it builds job using job's display name, for default jobs fetches from container as a public services.
SfCod\QueueBundle\Failer\FailedJobProviderInterface:
class: SfCod\QueueBundle\Failer\MongoFailedJobProvider
arguments:
- '@SfCod\QueueBundle\Base\MongoDriverInterface'
- 'queue_jobs_failed'SfCod\QueueBundle\Failer\FailedJobProviderInterface: failer service for failed jobs processing, where:
- SfCod\QueueBundle\Base\MongoDriverInterface - mongo driver
- 'queue_jobs_failed' - name of mongo collection
SfCod\QueueBundle\Handler\ExceptionHandlerInterface:
class: SfCod\QueueBundle\Handler\ExceptionHandler
arguments:
- '@Psr\Log\LoggerInterface'SfCod\QueueBundle\Handler\ExceptionHandlerInterface: main exception handler, used for logging issues
SfCod\QueueBundle\Base\MongoDriverInterface:
class: SfCod\QueueBundle\Service\MongoDriver
calls:
- [setCredentials, ['%env(MONGODB_URL)%']]
- [setDbname, ['%env(MONGODB_NAME)%']]SfCod\QueueBundle\Base\MongoDriverInterface: default config for mongo driver connection
If you want to change default connector, you can override SfCod\QueueBundle\Connector\ConnectorInterface or add method call:
SfCod\QueueBundle\Service\QueueManager:
calls:
- [addConnector, ['your-connector', '@your.service']]where 'your.service' must implement SfCod\QueueBundle\Connector\ConnectorInterface and then all your connections with driver 'your-connector' will be processed using new connector, for example:
sfcod_queue:
connections:
default: { driver: 'your-connector', collection: 'queue_jobs' queue: 'default', expire: 60, limit: 2 }You can run tests using prepared configuration xml file:
php bin/phpunit --configuration ./vendor/sfcod/jobqueue/phpunit.xml.dist --bootstrap ./vendor/autoload.php