1. Go to this page and download the library: Download leadtech/boot-rabbit-mq library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
leadtech / boot-rabbit-mq example snippets
// Autoload dependencies
ultTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Consumer\AbstractConsumer;
use Boot\RabbitMQ\RabbitMQ;
use Boot\RabbitMQ\Consumer\Event\ConsumerSuccessEvent;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent;
class ExampleConsumer extends AbstractConsumer
{
/**
* @param \PhpAmqpLib\Message\AMQPMessage $message
* @return bool
*/
public function handle(\PhpAmqpLib\Message\AMQPMessage $message)
{
echo "Received message #{$message->body['sequence_number']}\n";
// Return true for success, an ACK signal is sent to the server.
// Alternatively an exception or returning false will result in a NACK signal instead.
return true;
}
}
// Create event dispatcher (optional)
$eventDispatcher = new EventDispatcher();
// Create queue template
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
'some_queue_name',
new AMQPConnection('localhost', 5672, 'guest', 'guest'),
new FaultTolerantBehaviour,
$eventDispatcher
);
$queueTemplate->setExclusive(false);
$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
echo "Receiving a new message. Sequence number: {$event->getMessage()->body['sequence_number']}\n";
});
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ConsumerSuccessEvent $event){
echo "Successfully processed message. Sequence number: {$event->getMessage()->body['sequence_number']}\n\n";
});
$consumer = new ExampleConsumer($queueTemplate);
$consumer->connect();
$consumer->listen();
while($consumer->isBusy()) {
$consumer->wait();
}
// Autoload dependencies
ltTolerantBehaviour;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Connection\AMQPConnection;
use Boot\RabbitMQ\Producer\Producer;
$eventDispatcher = new EventDispatcher();
$queueTemplate = new \Boot\RabbitMQ\Template\QueueTemplate(
'some_queue_name',
new AMQPConnection('localhost', 5672, 'guest', 'guest'),
new FaultTolerantBehaviour
);
$queueTemplate->setExclusive(false);
$producer = new Producer($queueTemplate);
$producer->connect();
for($i=0;$i<=10;$i++) {
$producer->publish([
'sequence_number' => time() . '-' . $i
]);
}
/**
* Class SomeCustomBehaviour
* @package Boot\RabbitMQ\Strategy
*/
class SomeCustomBehaviour extends QueueStrategy
{
/**
* @param QueueTemplate $queueTemplate
* @param array $data
*
* @return AMQPMessage
*/
public function createMessage(QueueTemplate $queueTemplate, array $data)
{
return new AMQPMessage(
$queueTemplate->getSerializer()->serialize($data)
);
}
/**
* @param QueueTemplate $queueTemplate
*/
public function declareQueue(QueueTemplate $queueTemplate)
{
// ...
}
/**
* @param QueueTemplate $queueTemplate
*/
public function declareQualityOfService(QueueTemplate $queueTemplate)
{
// ...
}
/**
* Whether an (n)ack signal must be sent to the server. Depending on the setup this may or may not happen automatically.
*
* @return bool
*/
public function doAckManually()
{
// ...
}
}
use Boot\RabbitMQ\RabbitMQ;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Boot\RabbitMQ\Consumer\Event\ReceiveEvent
$eventDispatcher = new EventDispatcher();
// ON_RECEIVE
$eventDispatcher->addListener(RabbitMQ::ON_RECEIVE, function(ReceiveEvent $event){
var_dump($event->getMessage());
var_dump($event->getConsumer());
});
// ON_CONSUMER_ERROR
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_ERROR, function(ReceiveEvent $event){
// ...
});
// ON_CONSUMER_SUCCESS
$eventDispatcher->addListener(RabbitMQ::ON_CONSUMER_SUCCESS, function(ReceiveEvent $event){
// ...
});