1. Go to this page and download the library: Download thesis/amqp library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
thesis / amqp example snippets
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672,localhost:5673/');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = new Config(
urls: ['localhost:5672'],
user: 'guest',
vhost: '/test',
authMechanisms: ['plain', 'amqplain'],
);
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::default(); // amqp://guest:guest@localhost:5672/
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/test');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?auth_mechanism=amqplain&auth_mechanism=plain');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?heartbeat=30');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?connection_timeout=10');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?channel_max=30000');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?frame_max=50000');
declare(strict_types=1);
use Thesis\Amqp\Config;
$config = Config::fromURI('amqp://guest:guest@localhost:5672/?tcp_nodelay=false');
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
// your code here
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->close();
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->exchangeDeclare('events', durable: true);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->exchangeBind('service.a', 'service.b');
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->exchangeUnbind('service.a', 'service.b');
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->exchangeDelete('service.a', ifUnused: true);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$queue = $channel->queueDeclare('service.a.events');
var_dump($queue->messages, $queue->consumers);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->queueBind('service.a.events', 'service.a');
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->queueUnbind('service.a.events', 'service.a');
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$messages = $channel->queuePurge('service.a.events');
var_dump($messages);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$messages = $channel->queueDelete('service.a.events', ifUnused: true, ifEmpty: true);
var_dump($messages);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\DeliveryMode;
use Thesis\Time\TimeSpan;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->publish(new Message(
body: '...',
headers: ['x' => 'y'],
contentType: 'application/json',
contentEncoding: 'json',
deliveryMode: DeliveryMode::Persistent,
expiration: TimeSpan::fromSeconds(5),
));
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\PublishMessage;
$client = new Client(Config::default());
$channel = $client->channel();
$confirmation = $channel->publishBatch([
new PublishMessage(new Message('x'), routingKey: 'test'),
new PublishMessage(new Message('y'), routingKey: 'test'),
new PublishMessage(new Message('z'), routingKey: 'test'),
]);
// Only if $channel->confirmSelect() was called.
$unconfirmed = $confirmation->unconfirmed();
if (\count($unconfirmed) > 0) {
// retry publish.
}
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events', noAck: true);
var_dump($delivery?->message);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->ack();
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
$channel->ack($delivery);
}
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->nack(requeue: false);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
$channel->nack($delivery, requeue: false);
}
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->reject(requeue: false);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
$client = new Client(Config::default());
$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
$channel->reject($delivery, requeue: false);
}
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Channel;
$client = new Client(Config::default());
$channel = $client->channel();
$handler = function (DeliveryMessage $delivery) use ($httpclient): void {
// handle the delivery with an \Exception
$delivery->nack();
};
$delivery = $channel->get('test');
\assert($delivery !== null);
try {
$handler($delivery);
} finally {
$delivery->ack();
}
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Channel;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->qos(prefetchCount: 1);
$consumerTag = $channel->consume(static function (DeliveryMessage $delivery, Channel $_): void {
var_dump($delivery->message);
$delivery->ack();
}, queue: 'service.a.events');
$channel->cancel($consumerTag);
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Amp;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->qos(prefetchCount: 1);
$deliveries = $channel->consumeIterator('service.a.events', size: 1);
Amp\async(static function () use ($deliveries): void {
Amp\trapSignal([\SIGINT, \SIGTERM]);
$deliveries->complete();
});
foreach ($deliveries as $delivery) {
var_dump($delivery->message);
$delivery->ack();
}
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Amp;
$client = new Client(Config::default());
$channel = $client->channel();
$channel->qos(prefetchCount: 1);
$deliveries = $channel->consumeIterator('service.a.events', size: 1);
Amp\async(static function () use ($deliveries): void {
Amp\trapSignal([\SIGINT, \SIGTERM]);
$deliveries->cancel(new \Exception('you should stop'));
});
try {
foreach ($deliveries as $delivery) {
var_dump($delivery->message);
$delivery->ack();
}
} catch (\Throwable $e) {
var_dump($e->getMessage()); // you should stop
}
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;
->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
$channel->txCommit();
$channel->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
$channel->txRollback();
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Channel;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;
l): void {
$channel->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
});
try {
$channel->transactional(static function (Channel $channel): void {
$channel->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
throw new \DomainException('Ops.');
});
} catch (\Throwable $e) {
var_dump($e->getMessage()); // Ops.
}
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;
firmation = $channel->publish(new Message('...'), routingKey: 'test');
var_dump($confirmation?->await());
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Client;
use Thesis\Amqp\PublishConfirmation;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;
nfirmations = [];
for ($i = 0; $i < 100; ++$i) {
$confirmation = $channel->publish(new Message('...'), routingKey: 'test');
\assert($confirmation !== null);
$confirmations[] = $confirmation;
}
PublishConfirmation::awaitAll($confirmations);
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Client;
use Thesis\Amqp\Config;
use Thesis\Amqp\Message;
use Thesis\Amqp\DeliveryMessage;
use Amp;
// handle returns here
});
$channel->publish(new Message('...'), routingKey: 'not_exists', mandatory: true);
$client->disconnect();
declare(strict_types=1);
use Thesis\Amqp\Client;
use Thesis\Amqp\Config;
use Thesis\Amqp\Message;
use Thesis\Amqp\PublishResult;
$confirmation = $channel->publish(new Message('abz'), routingKey: 'xxx', mandatory: true);
if ($confirmation?->await() === PublishResult::Unrouted) {
// handle use case
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.