Download the PHP package printed/rabbitmq-queue-bundle without Composer
On this page you can find all versions of the php package printed/rabbitmq-queue-bundle. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Informations about the package rabbitmq-queue-bundle
printed/rabbitmq-queue-bundle
A RabbitMQ wrapper bundle for Symfony that aims to improve your experience with consumers and producers.
The bundle piggybacks off of the php-amqplib/rabbitmq-bundle
bundle.
Notable features
- Cancellable queue tasks
- Queue tasks completeness progress tracking
- An opinionated, zero-downtime deployment procedure, focused on supervisord
- (via
php-amqplib/rabbitmq-bundle
) Queue consumer's graceful max execution time - Queue tasks reattempts
Setup & Dependencies
- PHP
>=7.0
- https://packagist.org/packages/symfony/symfony
^3.4|^4.0
- https://packagist.org/packages/doctrine/orm
~2.5
- https://packagist.org/packages/monolog/monolog
~1.11
- https://packagist.org/packages/ramsey/uuid
~3.4
- https://packagist.org/packages/php-amqplib/rabbitmq-bundle
~1.6
We assume that you are familiar with the php-amqplib/rabbitmq-bundle
configuration and setup.
Special note for memcached users
Please open vendor/doctrine/cache/lib/Doctrine/Common/Cache/MemcachedCache.php
file
in your project and see whether you can find the following piece of code:
If you do, then consider upgrading doctrine/cache
version to at least 1.7.0
, otherwise
CacheQueueMaintenanceStrategy
might be saying that the maintenance mode is up when
there was any connection issues to your memcached server.
Bundle configuration
rabbitmq-queue-bundle.default_rabbitmq_producer_name
You are expected to have at least one RabbitMQ producer with the following config:
The value of the service_alias
should be provided in the rabbitmq-queue-bundle.default_rabbitmq_producer_name
parameter.
That config creates a RabbitMQ producer which dispatches tasks to appropriate queues by queue names. This is also known
as the "default" producer in RabbitMQ.
Important notice: Use dedicated EntityManager for your consumers.
AbstractQueueConsumer should ideally use a dedicated and separate EntityManager to process QueueTasks. This is to prevent problems with flushing changes to the QueueTasks when the application's default EntityManager stopped being operational (e.g. due to the "Is already closed" error, or due to the entity manager's unit of work having been cleared).
To achieve this, use a multi entity manager configuration for the DoctrineBundle, and then supply the dedicated entity manager to the constructor of your consumers.
Example configuration:
Usage
Assuming you have the configuration done for the previous bundles and running a basic Symfony demo application the steps below should be fairly easy to follow. Small changes can be made where needed but the examples are to help get the point across.
Example rabbitmq.yml config
Monolog
The consumer exposes a property $this->logger
that will be an instance of monolog/monolog
registered against Symfony with the queue
channel.
Feel free to handle the channel as you wish, but at the least we require channels: [queue]
be added to your monolog config.
Producing (Payloads)
We use classes known as payloads
to contain data for the consumer (also known as worker).
This class when constructed and given to the dispatcher will be first validated using symfony/validator
and then serialise and stored in the database.
The newly created record's ID is given to the RabbitMQ queue with the exchange defined in the abstract method getExchangeName
.
This allows for an easy interface for dispatching payloads without having to worry about the destination and keeping code up-to-date when exchange names change.
To dispatch this payload you can make use of the dispatcher class made available through this bundle. You can access it using printed.bundle.queue.service.queue_task_dispatcher
against the container. For example ..
Consuming (Workers)
As mentioned in php-amqplib/rabbitmq-bundle
your consumers need to be registered as services and given to the consumer definitions in the configuration. Its exactly the same here but we need a few arguments that are common across all consumers.
Note that the container.service_subscriber
tag is required for the ServiceSubscriber Symfony feature link.
Then your work would look very familiar, just a few changes and enhancements:
A task must always exit with a boolean value, true
meaning passed and false
meaning it failed. To make this more verbose you have access to constants TASK_COMPLETE
and TASK_FAILED
which are simply those boolean values under the bonnet. When marking the task as failed the attempts count against the task is incremented before it is given back to the queue for another attempt. Each task will be given an attempt limit specified by the getAttemptLimit()
method in the consumer. Feel free to override this but by default the limit is set to 10
. In those rare cases where you know the task will forever fail and you do not want to let it use its remaining attempts you can throw the QueueFatalErrorException
exception. The message will be logged to the usual logs and the attempt limit will be maxed, this will prevent the job from spawning anymore attempts.
Running these consumers is the same as the bundle mentioned: ./bin/console rabbitmq:consumer example_consumer -vv
Maintenance and deployment
In order to gracefully bring down your queue or halt its progress we have the following console commands defined. This will allow for easy maintenance of the queue consumers/workers.
In essence, queue:maintenance:up
will prevent any new jobs from being processed by the workers. When
a new job is being delivered to a worker, while the maintenance mode is up, then the worker will
immediately exit with code 0
. This is helpful if you are running this with something like supervisord
because supervisord
by default doesn't restart programs that exit with that status.
It's important to understand that the primary purpose of queue:maintenance:up
is to prevent new jobs
from being processed. That command is not for stopping/restarting workers (although it effectively happens most of the
time). Please make use of the New Deployments Detection feature described below to restart the workers.
The queue:maintenance:wait
will poll the database for running tasks, this command will only exit when none of the tasks are marked as running.
The -r
parameter will let you configure the number in seconds it waits to poll the database.
Also as its a symfony command feel free to quelch the output -q
in your deployment tools.
Then of course queue:maintenance:down
will allow the queues to run again.
Although you will need to manually restart them as they would have exited.
Before you disable the maintenance mode, you need to make sure all old idle workers are restarted.
There are many ways of doing it. This bundle provides you with a way for all workers to quit when
they detect that a new deployment has happened. This feature is called "New Deployments Detection"
and requires you to call the queue:store-new-deployment-stamp-command
with a string that can be used
to compare deployments (timestamp is generally enough). Make sure you configure this bundle to actually
use this feature.
Essentially you might have a build script looking like this:
As the tasks are stored in the database with all their payload data it is possible to spawn all the tasks again. This is handy if you need to upgrade or migrate your RabbitMQ instance. At this time we created the command for this (sorry) but it is easy enough to do by replicating the process you do initially to spawn a task but just loop over all the entries in the database marked as pending
or status = 1
. See the QueueTaskInterface
for more information here.
Tips
RabbitMQ vhosts
If you use one RabbitMQ server to host queues for multiple of deployments of your app (e.g. hosting test, staging, regression environments all on one RabbitMQ server), this feature lets you do that without running into queue names' conflicts.
To make use of it, set the following options:
old_sound_rabbit_mq.connections.default.vhost
printedcom_rabbitmq_queue_bundle.rabbitmq_vhost
Supervisord groups of processes.
This allows you to collect all queue consumers in one supervisord process group, which in turn allows you to start and stop them by using that group name and it allows you to run queue consumers from multiple of your app environments on one machine without running into name conflicts.
It's also the more proper way to stop your consumers, deploy your code, reread supervisord config and start your consumers. That replaces the "new deployment detection" feature of this bundle.
Queue tasks priorities
RabbitMQ offers a way to distribute and process queue tasks with different priorities. This bundle is aware of this feature
and offers you a way to make use of it. See: AbstractQueuePayload::__construct()
, the $queueMessageProperties
argument.
One of the reasons why this feature is useful is a situation when you conclude that you have n consumers mostly doing nothing, but still occupying server's resources (RAM). You can reorganise your code to use a single queue consumer listening to a single, shared queue and processing queue tasks that vary in priority. This bundle doesn't offer any opinion on such implementation, however having a shared queue with 5 priorities: lowest, low, normal, high, highest is a sound setup.
Tests & Contribution
Sadly no tests right now, there isn't much too actually test. But if you have anything to contribute then please open a pull-request now. Just keep the code clean!
All versions of rabbitmq-queue-bundle with dependencies
symfony/config Version ^3.4 || ^4.0
symfony/console Version ^3.4 || ^4.0
symfony/dependency-injection Version ^3.4 || ^4.0
symfony/expression-language Version ^3.4 || ^4.0
symfony/filesystem Version ^3.4 || ^4.0
symfony/http-foundation Version ^3.4 || ^4.0
symfony/http-kernel Version ^3.4 || ^4.0
symfony/validator Version ^3.4 || ^4.0
doctrine/cache Version ^1.6
doctrine/doctrine-bundle Version ~1.4
doctrine/orm Version ~2.5
monolog/monolog Version ~1.11
php-amqplib/rabbitmq-bundle Version ^1.13.0
ramsey/uuid Version ~3.4
php-http/guzzle6-adapter Version ^1.1
richardfullmer/rabbitmq-management-api Version ^2.0
psr/log Version ^1.0