PHP code example of php-amqplib / rabbitmq-bundle

1. Go to this page and download the library: Download php-amqplib/rabbitmq-bundle library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

php-amqplib / rabbitmq-bundle example snippets


$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

class ConnectionParametersProviderService implements ConnectionParametersProvider {
    ...
    public function getConnectionParameters() {
        return array('vhost' => $this->getVhost());
    }
    ...
}

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

namespace App\EventListener;

use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(event: BeforeProducerPublishMessageEvent::NAME)]
final class AMQPBeforePublishEventListener
{
    public function __invoke(BeforeProducerPublishMessageEvent $event): void
    {
        // Your code goes here
    }
}

namespace App\EventListener;

use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener(event: AfterProducerPublishMessageEvent::NAME)]
final class AMQPBeforePublishEventListener
{
    public function __invoke(AfterProducerPublishMessageEvent $event): void
    {
        // Your code goes here
    }
}

class OnConsumeEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_CONSUME;

    /**
     * OnConsumeEvent constructor.
     *
     * @param Consumer $consumer
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
    }
}

class BeforeProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

    /**
     * BeforeProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}

class AfterProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

    /**
     * AfterProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}


class OnIdleEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_IDLE;

    /**
     * OnIdleEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
        
        $this->forceStop = true;
    }
}

public function indexAction($name, ProducerInterface $uploadPictureProducer)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $uploadPictureProducer->publish(serialize($msg));
}



//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php

namespace Acme\DemoBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        //Process picture upload.
        //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.

        $isUploadSuccess = someUploadPictureMethod();
        if (!$isUploadSuccess) {
            // If your image upload failed due to a temporary error you can return false
            // from your callback so the message will be rejected by the consumer and
            // requeued by RabbitMQ.
            // Any other value not equal to false will acknowledge the message and remove it
            // from the queue
            return false;
        }
    }
}

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
}

public function indexAction($name)
{
    $expiration = 5000; // milliseconds
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest($body, $server, $requestId, $routingKey, $expiration);
    try {
        $replies = $client->getReplies();
        // process $replies['request_id'];
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // handle timeout
    }
}

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.parallel_rpc');
    $client->addRequest($name, 'char_count', 'char_count');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int');
    $replies = $client->getReplies();
}

public function indexAction()
{    
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $additionalProperties = ['priority' => 10] ; 
    $routing_key = '';
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg), $routing_key , $additionalProperties );
}

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        foreach ($messages as $message) {
            $this->executeSomeLogicPerMessage($message);
        }

        // you ack all messages got in batch
        return true; 
    }
}

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        $result = [];
        /** @var AMQPMessage $message */
        foreach ($messages as $message) {
            $result[$message->getDeliveryTag()] = $this->executeSomeLogicPerMessage($message);
        }

        // you ack only some messages that have return true
        // e.g:
        // $return = [
        //      1 => true,
        //      2 => true,
        //      3 => false,
        //      4 => true,
        //      5 => -1,
        //      6 => 2,
        //  ];
        // The following will happen:
        //  * ack: 1,2,4
        //  * reject and requeq: 3
        //  * nack and requeue: 6
        //  * reject and drop: 5
        return $result;
    }
}
bash
$ composer 

{
    "  "php-amqplib/rabbitmq-bundle": "^2.0",
    }
}