1. Go to this page and download the library: Download pccomponentes/amqp library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
pccomponentes / amqp example snippets
PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\ExchangeBuilder;
use Pccomponentes\Amqp\Builder\QueueBuilder;
use Pccomponentes\Amqp\Builder\BindBuilder;
$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');
$queueBuilder = (new QueueBuilder('queue_example'))
->durable()
->noAutoDelete();
$exchangeBuilder = (new ExchangeBuilder('exchange_example', ExchangeBuilder::TYPE_FANOUT))
->durable()
->noAutoDelete();
$bindBuilder = new BindBuilder('queue_example', 'exchange_example', '');
$channel = $connection->channel();
$queueBuilder->build($channel);
$exchangeBuilder->build($channel);
$bindBuilder->build($channel);
AmqpLib\Connection\AbstractConnection
AmqpLib\Message\AMQPMessage
PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicPublishBuilder;
use Pccomponentes\Amqp\Builder\MessageBuilder;
use Pccomponentes\Amqp\Publisher\Publisher;
$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');
$basicPublishBuilder = (new BasicPublishBuilder('exchange_example'))
->noImmediate()
->noMandatory();
$messageBuilder = (new MessageBuilder())
->contentTypeJson()
->deliveryModePersistent();
$publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder);
$publisher->send('{"message" : "example"}', 'example');
$publisher->close();
AmqpLib\Connection\AbstractConnection
AmqpLib\Message\AMQPMessage
PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicConsumeBuilder;
use Pccomponentes\Amqp\Subscriber\SubscriberMessage;
use Pccomponentes\Amqp\Subscriber\SubscriberCallback;
use Pccomponentes\Amqp\Subscriber\Subscriber;
$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');
$basicConsumeBuilder = new BasicConsumeBuilder('queue_example');
$basicConsumeBuilder
->wait()
->ack()
->local()
->prefetchSize(0)
->prefetchCount(1)
->noPrefetchGlobal();
$callback = new class implements SubscriberCallback
{
public function execute(SubscriberMessage $message): void
{
\var_dump($message->message()->getBody());
$message->ack();
}
};
$subscriber = new Subscriber($connection, $basicConsumeBuilder, $callback);
$subscriber->listen(3, 10);
PhpAmqpLib\Connection\AMQPStreamConnection;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\MessageBus;
use Pccomponentes\Amqp\Builder\BasicConsumeBuilder;
use Pccomponentes\Amqp\Messenger\MessageBusSusbcriberCallback;
use Pccomponentes\Amqp\Subscriber\Subscriber;
$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');
$messageBusMiddleware = new class() implements MiddlewareInterface
{
public function handle($message, callable $next)
{
\var_dump($message->message()->getBody());
$message->ack();
return $next($message);
}
};
$basicConsumeBuilder = new BasicConsumeBuilder('queue_example');
$basicConsumeBuilder
->wait()
->ack()
->local()
->prefetchSize(0)
->prefetchCount(1)
->noPrefetchGlobal();
$messageBus = new MessageBus([$messageBusMiddleware]);
$toMessageBusCallback = new MessageBusSusbcriberCallback($messageBus);
$subscriber = new Subscriber($connection, $basicConsumeBuilder, $toMessageBusCallback);
$subscriber->listen(1, 10);
PhpAmqpLib\Connection\AMQPStreamConnection;
use Pccomponentes\Amqp\Builder\BasicPublishBuilder;
use Pccomponentes\Amqp\Builder\MessageBuilder;
use Pccomponentes\Amqp\Publisher\Publisher;
use Pccomponentes\Amqp\Messenger\PublisherMiddleware;
use Pccomponentes\Amqp\Messenger\MessageSerializer;
use Symfony\Component\Messenger\MessageBus;
$connection = new AMQPStreamConnection('ampq-rabbitmq', 5672, 'guest', 'guest', 'my_vhost');
$basicPublishBuilder = (new BasicPublishBuilder('exchange_example'))
->noImmediate()
->noMandatory();
$messageBuilder = (new MessageBuilder())
->contentTypeJson()
->deliveryModePersistent();
$publisher = new Publisher($connection, $basicPublishBuilder, $messageBuilder);
$messageSerializer = new class implements MessageSerializer
{
public function serialize($message): string
{
return \json_encode($message);
}
public function routingKey($message): string
{
return $message->topic;
}
};
$publisherMiddleware = new PublisherMiddleware($publisher, $messageSerializer);
$messageBus = new MessageBus([$publisherMiddleware]);
$message = \json_decode(\json_encode(['body' => 'body example', 'topic' => 'topic_example']));
$messageBus->dispatch($message);
#!/usr/bin/env php
onsole\Application;
use Pccomponentes\Amqp\Command\SubscriberCommand;
use Pccomponentes\Amqp\Subscriber\Subscriber;
$subscriber = new Subscriber(/* argumentos */);
$application = new Application();
$application->addCommands(
[
new SubscriberCommand('custom', $subscriber)
]
);
$application->run();
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.