PHP code example of marwanalsoltany / amqp-agent

1. Go to this page and download the library: Download marwanalsoltany/amqp-agent library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

marwanalsoltany / amqp-agent example snippets


// Publisher
$publisher = new Publisher();
$publisher->work($messages);

// Consumer
$consumer = new Consumer();
$consumer->work($callback);

// RPC Client
$rpcClient = new ClientEndpoint();
$rpcClient->connect();
$response = $rpcClient->request($request);
$rpcClient->disconnect();

// RPC Server
$rpcServer = new ServerEndpoint();
$rpcServer->connect();
$request = $rpcServer->respond($callback);
$rpcServer->disconnect();


 return [
    // Global
    'connectionOptions' => [
        'host'     => 'your-rabbitmq-server.com',
        'port'     => 5672,
        'user'     => 'your-username',
        'password' => 'your-password',
        'vhost'    => '/'
    ],
    'queueOptions' => [
        'queue'   => 'your.queue.name',
        'durable' => true,
        'nowait'  => false
    ],
    // Publisher
    'exchangeOptions' => [
        'exchange' => 'your.exchange.name',
        'type'     => 'direct'
    ],
    'bindOptions' => [
        'queue'    => 'your.queue.name',
        'exchange' => 'your.exchange.name'
    ],
    'messageOptions' => [
        'properties' => [
            'content_type'     => 'application/json',
            'content_encoding' => 'UTF-8',
            'delivery_mode'    => 2
        ]
    ],
    'publishOptions' => [
        'exchange'    => 'your.exchange.name',
        'routing_key' => 'your.route.name'
    ],
    // Consumer
    'qosOptions' => [
        'prefetch_count' => 25
    ],
    'waitOptions' => [
        'timeout' => 3600
    ],
    'consumeOptions' => [
        'queue'        => 'your.queue.name',
        'consumer_tag' => 'your.consumer.name',
        'callback'     => 'YourNamespace\YourClass::yourCallback'
    ]
    // RPC Endpoints
    'rpcQueueName' => 'your.rpc.queue.name'
];


// Instantiating Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Worker\PublisherSingleton;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Worker\ConsumerSingleton;
use MAKS\AmqpAgent\RPC\ClientEndpoint;
use MAKS\AmqpAgent\RPC\ServerEndpoint;

$publisher1 = new Publisher(/* parameters can be passed here */);
$publisher2 = PublisherSingleton::getInstance(/* parameters can be passed here */);

$consumer1 = new Consumer(/* parameters can be passed here */);
$consumer2 = ConsumerSingleton::getInstance(/* parameters can be passed here */);

$rpcClientA = new ClientEndpoint(/* parameters can be passed here */);
$rpcServerA = new ServerEndpoint(/* parameters can be passed here */);

// the parameters from this Config object will be passed to the workers.
$config = new Config('path/to/your/config-file.php');
$client = new Client($config); // path can also be passed directly to Client

$publisher3 = $client->getPublisher(); // or $client->get('publisher');
$consumer3 = $client->getConsumer(); // or $client->get('consumer');

$rpcClientB = $client->getClientEndpoint(); // or $client->get('client.endpoint');
$rpcServerB = $client->getServerEndpoint(); // or $client->get('server.endpoint');

// Use $client->gettable() to get an array of all available services.


// Publisher Demo 1

$messages = [
    'This is an example message. ID [1].',
    'This is an example message. ID [2].',
    'This is an example message. ID [3].'
];


$publisher = new Publisher(
    [
        // connectionOptions
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    [
        // channelOptions
    ],
    [
        // queueOptions
        'queue' => 'test.messages.queue'
    ],
    [
        // exchangeOptions
        'exchange' => 'test.messages.exchange'
    ],
    [
        // bindOptions
        'queue' => 'test.messages.queue',
        'exchange' => 'test.messages.exchange'
    ],
    [
        // messageOptions
        'properties' => [
            'content_type' => 'text/plain',
        ]
    ],
    [
        // publishOptions
        'exchange' => 'test.messages.exchange'
    ]
);

// Variant I (1)
$publisher->connect()->queue()->exchange()->bind();
foreach ($messages as $message) {
    $publisher->publish($message);
}
$publisher->disconnect();

// Variant I (2)
$publisher->prepare();
foreach ($messages as $message) {
    $publisher->publish($message);
}
$publisher->disconnect();

// Variant I (3)
$publisher->work($messages);


// Publisher Demo 2

$messages = [
    'This is an example message. ID [1].',
    'This is an example message. ID [2].',
    'This is an example message. ID [3].'
];


$publisher = new Publisher();

// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$publisher->connectionOptions = [
    'host' => 'localhost',
    'user' => 'guest',
    'password' => 'guest'
];
$publisher->connect();
$publisher->queue([
    'queue' => 'test.messages.queue'
]);
$publisher->exchange([
    'exchange' => 'test.messages.exchange'
]);
$publisher->bind([
    'queue' => 'test.messages.queue',
    'exchange' => 'test.messages.exchange'
]);
foreach ($messages as $message) {
    $publisher->publish(
        [
            'body' => $message,
            'properties' => [
                'content_type' => 'text/plain',
            ]
        ],
        [
            'exchange' => 'test.messages.exchange'
        ]
    );
}
$publisher->disconnect();


// Consumer Demo 1

$consumer = new Consumer(
    [
        // connectionOptions
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    [
        // channelOptions
    ],
    [
        // queueOptions
        'queue' => 'test.messages.queue'
    ],
    [
        // qosOptions
        'exchange' => 'test.messages.exchange'
    ],
    [
        // waitOptions
    ],
    [
        // consumeOptions
        'queue' => 'test.messages.queue',
        'callback' => 'YourNamespace\YourClass::yourCallback',
    ],
    [
        // publishOptions
        'exchange' => 'test.messages.exchange'
    ]
);

// Variant I (1)
$consumer->connect();
$consumer->queue();
$consumer->qos();
$consumer->consume();
$consumer->wait();
$consumer->disconnect();

// Variant I (2)
$consumer->prepare()->consume()->wait()->disconnect();

// Variant I (3)
$consumer->work('YourNamespace\YourClass::yourCallback');


// Consumer Demo 2

$variable = 'This variable is needed in your callback. It will be the second, the first is always the message!';


$consumer = new Consumer();

// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$consumer->connectionOptions = [
    'host' => 'localhost',
    'user' => 'guest',
    'password' => 'guest'
];
$consumer->connect();
$consumer->queue([
    'queue' => 'test.messages.queue'
]);
$consumer->qos([
    'prefetch_count' => 10
]);
$consumer->consume(
    [
        'YourNamespace\YourClass',
        'yourCallback'
    ],
    [
        $variable
    ],
    [
        'queue' => 'test.messages.queue'
    ]
);
$consumer->wait();
$consumer->disconnect();


// RPC Client Demo 1

$rpcClient = new ClientEndpoint(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$rpcClient->connect();
$response = $rpcClient->request('{"command":"some-command","parameter":"some-parameter"}');
$rpcClient->disconnect();


// RPC Client Demo 2

$rpcClient = new ClientEndpoint();
$rpcClient->connect(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$response = $rpcClient->request(
    '{"command":"some-command","parameter":"some-parameter"}',
    'your.rpc.queue.name'
);
$rpcClient->disconnect();


// RPC Server Demo 1

$rpcServer = new ServerEndpoint(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$rpcServer->connect();
$request = $rpcServer->respond('YourNamespace\YourClass::yourCallback');
$rpcServer->disconnect();


// RPC Server Demo 2

$rpcServer = new ServerEndpoint();
$rpcServer->connect(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$request = $rpcServer->respond(
    'YourNamespace\YourClass::yourCallback',
    'your.rpc.queue.name'
);
$rpcServer->disconnect();


// Advanced Publisher Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Helper\Serializer;

// Preparing some data to work with.
$data = [];
for ($i = 0; $i < 10000; $i++) {
    $data[] = [
        'id' => $i,
        'importance' => $i % 3 == 0 ? 'high' : 'low', // Tag 1/3 of the messages with high importance.
        'text' => 'Test message with ID ' . $i
    ];
}

// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$config = new Config();

// Instantiating a client.
$client = new Client($config);

// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');

// Retrieving a publisher from the client.
/** @var \MAKS\AmqpAgent\Worker\Publisher */
$publisher = $client->get('publisher');

// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$publisher->connect();

// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$publisher->queue([
    'queue' => 'high.and.low.importance.queue',
    'arguments' => $publisher->arguments([
        'x-max-priority' => 2,
        'x-queue-mode' => 'lazy'
    ])
]);

// Declaring a direct exchange to publish messages to.
$publisher->exchange([
    'exchange' => 'high.and.low.importance.exchange',
    'type' => 'direct'
]);

// Binding the queue with the exchange.
$publisher->bind([
    'queue' => 'high.and.low.importance.queue',
    'exchange' => 'high.and.low.importance.exchange'
]);

// Publishing messages according to their priority.
foreach ($data as $item) {
    $payload = $serializer->serialize($item, 'JSON');
    if ($item['importance'] == 'high') {
        $publisher->publish(
            [
                'body' => $payload,
                'properties' => [
                    'priority' => 2
                ],
            ],
            [
                'exchange' => 'high.and.low.importance.exchange'
            ]
        );
        continue;
    }
    $publisher->publish(
        $payload, // Not providing priority will fall back to 0
        [
            'exchange' => 'high.and.low.importance.exchange'
        ]
    );
}

// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$publisher->publish(
    [
        'body' => $serializer->serialize(
            Publisher::makeCommand('start', 'consumer'),
            'JSON'
        ),
        'properties' => [
            'priority' => 1
        ],
    ],
    [
        'exchange' => 'high.and.low.importance.exchange'
    ]
);

// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$iterator = 2;
do {
    $publisher->publish(
        [
            'body' => $serializer->serialize(
                Publisher::makeCommand('close', 'channel'),
                'JSON'
            ),
            'properties' => [
                'priority' => 0
            ],
        ],
        [
            'exchange' => 'high.and.low.importance.exchange'
        ]
    );
    $iterator--;
} while ($iterator != 0);

// Close the connection with RabbitMQ server.
$publisher->disconnect();


// Advanced Consumer Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Helper\Serializer;
use MAKS\AmqpAgent\Helper\Logger;

$config = new Config();
$client = new Client($config);

// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var \MAKS\AmqpAgent\Helper\Logger */
$logger = $client->get('logger');
$logger->setDirectory(__DIR__);
$logger->setFilename('high-and-low-importance-messages');

// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');

// Retrieving a consumer from the client.
/** @var \MAKS\AmqpAgent\Worker\Consumer */
$consumer = $client->get('consumer');

$consumer->connect();

// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$consumer->queue([
    'queue' => 'high.and.low.importance.queue',
    'arguments' => $consumer->arguments([
        'x-max-priority' => 2,
        'x-queue-mode' => 'lazy'
    ])
]);

// Overwriting the default quality of service.
$consumer->qos([
    'prefetch_count' => 1,
]);

// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$callback = function($message, &$client, $callback) {
    $data = $client->getSerializer()->unserialize($message->body, 'JSON');

    if (Consumer::isCommand($data)) {
        Consumer::ack($message);
        if (Consumer::hasCommand($data, 'close', 'channel')) {
            // Giving time for acknowledgements to take effect,
            // because the channel will be closed shortly
            sleep(5);
            // Close the channel using the delivery info of the message.
            Consumer::shutdown($message);
        } elseif (Consumer::hasCommand($data, 'start', 'consumer')) {
            $consumer = $client->getConsumer();
            // Getting a new channel on the same connection.
            $channel = $consumer->getNewChannel();
            $consumer->queue(
                [
                    'queue' => 'high.and.low.importance.queue',
                    'arguments' => $consumer->arguments([
                        'x-max-priority' => 2,
                        'x-queue-mode' => 'lazy'
                    ])
                ],
                $channel
            );
            $consumer->qos(
                [
                    'prefetch_count' => 1,
                ],
                $channel
            );
            $consumer->consume(
                $callback,
                [
                    &$client,
                    $callback
                ],
                [
                    'queue' => 'high.and.low.importance.queue',
                    'consumer_tag' => 'callback.consumer-' . uniqid()
                ],
                $channel
            );
        }
        return;
    }

    $client->getLogger()->write("({$data['importance']}) - {$data['text']}");
    // Sleep for 50ms to mimic some processing.
    usleep(50000);

    // The final step is acknowledgment so that no data is lost.
    Consumer::ack($message);
};

$consumer->consume(
    $callback,
    [
        &$client, // Is used to refetch the consumer, serializer, and logger.
        $callback // This gets passed to the consumer that get started by the callback.
    ],
    [
        'queue' => 'high.and.low.importance.queue'
    ]
);

// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$consumer->waitForAll();

// Close the connection with RabbitMQ server.
$consumer->disconnect();


// Advanced RPC Client Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ClientEndpoint;

$config = new Config();
$client = new Client($config);

// Retrieving an RPC client endpoint from the client.
/** @var \MAKS\AmqpAgent\RPC\ClientEndpoint */
$rpcClient = $client->getClientEndpoint();

// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$rpcClient
    ->on('connection.after.open', function ($connection, $rpcClient, $eventName) {
        printf('%s has emitted [%s] event and is now connected!', get_class($rpcClient), $eventName);
        if ($connection instanceof AMQPStreamConnection) {
            printf('  The connection has currently %d channel(s).', count($connection->channels) - 1);
        }
    })->on('request.before.send', function ($request, $rpcClient, $eventName) {
        printf('%s has emitted [%s] event and is about to send a request!', get_class($rpcClient), $eventName);
        if ($request instanceof AMQPMessage) {
            $request->set('content_type', 'application/json')
            printf('  The request content_type header has been set to: %s', $request->get('content_type'));
        }
    });

// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$roundtrip = $rpcClient->ping();

$rpcClient->connect();
$response = $rpcClient->request('{"command":"some-command","parameter":"some-parameter"}');
$rpcClient->disconnect();


// Advanced RPC Server Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ServerEndpoint;

$config = new Config();
$client = new Client($config);

// Retrieving an RPC server from the client.
/** @var \MAKS\AmqpAgent\RPC\ServerEndpoint */
$rpcServer = $client->getServerEndpoint();

// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$rpcServer
    ->on('request.on.get', function ($request, $rpcServer, $eventName) {
        printf('%s has emitted [%s] event and has just got a request!', get_class($rpcServer), $eventName);
        if ($request instanceof AMQPMessage) {
            printf('  The request has the following body: %s', $request->body;
        }
    });

$rpcServer->connect();
$request = $rpcServer->respond('YourNamespace\YourClass::yourCallback');
$rpcServer->disconnect();