<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
mhujer / rabbit-mq-consumer-handler-bundle example snippets
declare(strict_types = 1);
namespace Example;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;
class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{
/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
private $consumerHandler;
public function __construct(
ConsumerHandler $consumerHandler
)
{
$this->consumerHandler = $consumerHandler;
}
public function execute(AMQPMessage $message): int
{
return $this->consumerHandler->processMessage(function () use ($message): int {
$data = $message->body;
// do your magic with $data, basically anything you would put in the consumer
// without this bundle, apart from the stuff this bundle handles automatically
return ConsumerInterface::MSG_ACK;
});
}
}
declare(strict_types = 1);
namespace Example;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LogLevel;
use VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler;
class ExampleConsumer implements \OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
{
/** @var \VasekPurchart\RabbitMqConsumerHandlerBundle\ConsumerHandler\ConsumerHandler */
private $consumerHandler;
public function __construct(
ConsumerHandler $consumerHandler
)
{
$this->consumerHandler = $consumerHandler;
}
public function execute(AMQPMessage $message): int
{
return $this->consumerHandler->processMessage(
function (ConsumerHandler $consumerHandler) use ($message): int {
// the correct ConsumerHandler is passed into the callback,
// so you can use it for custom logging etc
try {
$data = $message->body;
// ...
return ConsumerInterface::MSG_ACK;
} catch (\ResourceNotFound $e) {
// might be cause by the asynchronous nature of message queues
// - a resource might not yet be accessible
// or it might have been deleted already
// basically you can choose if this is OK (ACK or REJECT,
// depending on semantics), or if you want to try later
// again (REJECT_REQUEUE)
return ConsumerInterface::REJECT;
} catch (\UnexpectedBusinessLogicException $e) {
// situation which you are not sure, why it happens,
// but you need to investigate further (perhaps with more logging)
// and perhaps throw away these messages, because
// it might clutter the queue
$consumerHandler->log(LogLevel::ERROR, 'My custom message');
$consumerHandler->logException($e);
return ConsumerInterface::MSG_REJECT;
} catch (\UnexpectedException $e) {
// situation where you might need to decide further
// what to do in the catch block
if ($e->getCode() === 123) {
return ConsumerInterface::MSG_REJECT;
}
throw $e; // handle with default "catchall"
} catch (\ExpectedBusinessLogicException $e) {
// situation where you can solve it in a different way
// call a service
return ConsumerInterface::MSG_ACK;
} catch (\ConnectionTimeoutCustomException $e) {
// situation where the application would need a restart
// to reinitialize for example a connection
// this would happen also by default in the "catchall",
// but you might want to handle a specific case separately,
// for example not to log these exceptions
// this will stop the consumer
$consumerHandler->stopConsumer('Connection timeout');
return ConsumerInterface::MSG_REJECT_REQUEUE;
}
}
);
}
}