PHP code example of cyberclick-os / rabbitmq-bundle
1. Go to this page and download the library: Download cyberclick-os/rabbitmq-bundle library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
// app/AppKernel.php
public function registerBundles()
{
$bundles = array(
new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
);
}
use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;
// ...
$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters() {
return array('vhost' => $this->getVhost());
}
...
}
public function indexAction($name)
{
$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent::ON_CONSUME;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct(Consumer $consumer)
{
$this->setConsumer($consumer);
}
}
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent::ON_IDLE;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer)
{
$this->setConsumer($consumer);
$this->forceStop = true;
}
}
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme\DemoBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute(AMQPMessage $msg)
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$isUploadSuccess = someUploadPictureMethod();
if (!$isUploadSuccess) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false;
}
}
}
public function indexAction($name)
{
$client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
$client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
$replies = $client->getReplies();
}
namespace AppBundle\Service;
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
foreach ($messages as $message) {
$this->executeSomeLogicPerMessage($message);
}
// you ack all messages got in batch
return true;
}
}
namespace AppBundle\Service;
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
$result = [];
/** @var AMQPMessage $message */
foreach ($messages as $message) {
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $result;
}
}
bash
$ composer
{
" "php-amqplib/rabbitmq-bundle": "~1.6",
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.