PHP code example of thesis / amqp

1. Go to this page and download the library: Download thesis/amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

thesis / amqp example snippets




declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672,localhost:5673/');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromArray([
    'scheme' => 'amqp',
    'urls' => ['localhost:5672'],
    'user' => 'guest',
    'password' => 'guest',
]);



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = new Config(
    urls: ['localhost:5672'],
    user: 'guest',
    vhost: '/test',
    authMechanisms: ['plain', 'amqplain'],
);



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::default(); // amqp://guest:guest@localhost:5672/



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/test');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?auth_mechanism=amqplain&auth_mechanism=plain');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?heartbeat=30');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?connection_timeout=10');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?channel_max=30000');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?frame_max=50000');



declare(strict_types=1);

use Thesis\Amqp\Config;

$config = Config::fromURI('amqp://guest:guest@localhost:5672/?tcp_nodelay=false');



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

// your code here

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->close();

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->exchangeDeclare('events', durable: true);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->exchangeBind('service.a', 'service.b');



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->exchangeUnbind('service.a', 'service.b');



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->exchangeDelete('service.a', ifUnused: true);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$queue = $channel->queueDeclare('service.a.events');

var_dump($queue->messages, $queue->consumers);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->queueBind('service.a.events', 'service.a');



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->queueUnbind('service.a.events', 'service.a');



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$messages = $channel->queuePurge('service.a.events');
var_dump($messages);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$messages = $channel->queueDelete('service.a.events', ifUnused: true, ifEmpty: true);
var_dump($messages);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\DeliveryMode;
use Thesis\Time\TimeSpan;

$client = new Client(Config::default());

$channel = $client->channel();
$channel->publish(new Message(
    body: '...',
    headers: ['x' => 'y'],
    contentType: 'application/json',
    contentEncoding: 'json',
    deliveryMode: DeliveryMode::Persistent,
    expiration: TimeSpan::fromSeconds(5),
));



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\PublishMessage;

$client = new Client(Config::default());

$channel = $client->channel();
$confirmation = $channel->publishBatch([
    new PublishMessage(new Message('x'), routingKey: 'test'),
    new PublishMessage(new Message('y'), routingKey: 'test'),
    new PublishMessage(new Message('z'), routingKey: 'test'),
]);

// Only if $channel->confirmSelect() was called.
$unconfirmed = $confirmation->unconfirmed();

if (\count($unconfirmed) > 0) {
    // retry publish.
}



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events', noAck: true);

var_dump($delivery?->message);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->ack();



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
    $channel->ack($delivery);
}



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->nack(requeue: false);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
    $channel->nack($delivery, requeue: false);
}



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
$delivery?->reject(requeue: false);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;

$client = new Client(Config::default());

$channel = $client->channel();
$delivery = $channel->get('service.a.events');
if ($delivery !== null) {
    $channel->reject($delivery, requeue: false);
}



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Channel;

$client = new Client(Config::default());

$channel = $client->channel();

$handler = function (DeliveryMessage $delivery) use ($httpclient): void {
    // handle the delivery with an \Exception
    $delivery->nack();
};

$delivery = $channel->get('test');
\assert($delivery !== null);

try {
    $handler($delivery);
} finally {
    $delivery->ack();
}



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Channel;

$client = new Client(Config::default());

$channel = $client->channel();

$channel->qos(prefetchCount: 1);
$consumerTag = $channel->consume(static function (DeliveryMessage $delivery, Channel $_): void {
    var_dump($delivery->message);
    $delivery->ack();    
}, queue: 'service.a.events');

$channel->cancel($consumerTag);



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Amp;

$client = new Client(Config::default());

$channel = $client->channel();

$channel->qos(prefetchCount: 1);
$deliveries = $channel->consumeIterator('service.a.events', size: 1);

Amp\async(static function () use ($deliveries): void {
    Amp\trapSignal([\SIGINT, \SIGTERM]);
    $deliveries->complete();
});

foreach ($deliveries as $delivery) {
    var_dump($delivery->message);
    $delivery->ack();
}

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Amp;

$client = new Client(Config::default());

$channel = $client->channel();

$channel->qos(prefetchCount: 1);
$deliveries = $channel->consumeIterator('service.a.events', size: 1);

Amp\async(static function () use ($deliveries): void {
    Amp\trapSignal([\SIGINT, \SIGTERM]);
    $deliveries->cancel(new \Exception('you should stop'));
});

try {
    foreach ($deliveries as $delivery) {
        var_dump($delivery->message);
        $delivery->ack();
    }
} catch (\Throwable $e) {
    var_dump($e->getMessage()); // you should stop
}

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;

->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
$channel->txCommit();

$channel->publish(new Message('...'), routingKey: 'test');
$channel->publish(new Message('...'), routingKey: 'test');
$channel->txRollback();

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Channel;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;

l): void {
    $channel->publish(new Message('...'), routingKey: 'test');
    $channel->publish(new Message('...'), routingKey: 'test');
    $channel->publish(new Message('...'), routingKey: 'test');
});

try {
    $channel->transactional(static function (Channel $channel): void {
        $channel->publish(new Message('...'), routingKey: 'test');
        $channel->publish(new Message('...'), routingKey: 'test');
        throw new \DomainException('Ops.');
    });
} catch (\Throwable $e) {
    var_dump($e->getMessage()); // Ops.
}

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;

firmation = $channel->publish(new Message('...'), routingKey: 'test');
var_dump($confirmation?->await());

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Client;
use Thesis\Amqp\PublishConfirmation;
use Thesis\Amqp\Message;
use Thesis\Amqp\Config;

nfirmations = [];
for ($i = 0; $i < 100; ++$i) {
    $confirmation = $channel->publish(new Message('...'), routingKey: 'test');
    \assert($confirmation !== null);

    $confirmations[] = $confirmation;
}

PublishConfirmation::awaitAll($confirmations);

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Client;
use Thesis\Amqp\Config;
use Thesis\Amqp\Message;
use Thesis\Amqp\DeliveryMessage;
use Amp;

 // handle returns here
});

$channel->publish(new Message('...'), routingKey: 'not_exists', mandatory: true);

$client->disconnect();



declare(strict_types=1);

use Thesis\Amqp\Client;
use Thesis\Amqp\Config;
use Thesis\Amqp\Message;
use Thesis\Amqp\PublishResult;



$confirmation = $channel->publish(new Message('abz'), routingKey: 'xxx', mandatory: true);

if ($confirmation?->await() === PublishResult::Unrouted) {
    // handle use case
}