1. Go to this page and download the library: Download marwanalsoltany/amqp-agent library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
marwanalsoltany / amqp-agent example snippets
// Publisher
$publisher = new Publisher();
$publisher->work($messages);
// Consumer
$consumer = new Consumer();
$consumer->work($callback);
// RPC Client
$rpcClient = new ClientEndpoint();
$rpcClient->connect();
$response = $rpcClient->request($request);
$rpcClient->disconnect();
// RPC Server
$rpcServer = new ServerEndpoint();
$rpcServer->connect();
$request = $rpcServer->respond($callback);
$rpcServer->disconnect();
// Instantiating Demo
use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Worker\PublisherSingleton;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Worker\ConsumerSingleton;
use MAKS\AmqpAgent\RPC\ClientEndpoint;
use MAKS\AmqpAgent\RPC\ServerEndpoint;
$publisher1 = new Publisher(/* parameters can be passed here */);
$publisher2 = PublisherSingleton::getInstance(/* parameters can be passed here */);
$consumer1 = new Consumer(/* parameters can be passed here */);
$consumer2 = ConsumerSingleton::getInstance(/* parameters can be passed here */);
$rpcClientA = new ClientEndpoint(/* parameters can be passed here */);
$rpcServerA = new ServerEndpoint(/* parameters can be passed here */);
// the parameters from this Config object will be passed to the workers.
$config = new Config('path/to/your/config-file.php');
$client = new Client($config); // path can also be passed directly to Client
$publisher3 = $client->getPublisher(); // or $client->get('publisher');
$consumer3 = $client->getConsumer(); // or $client->get('consumer');
$rpcClientB = $client->getClientEndpoint(); // or $client->get('client.endpoint');
$rpcServerB = $client->getServerEndpoint(); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services.
// Publisher Demo 1
$messages = [
'This is an example message. ID [1].',
'This is an example message. ID [2].',
'This is an example message. ID [3].'
];
$publisher = new Publisher(
[
// connectionOptions
'host' => 'localhost',
'user' => 'guest',
'password' => 'guest'
],
[
// channelOptions
],
[
// queueOptions
'queue' => 'test.messages.queue'
],
[
// exchangeOptions
'exchange' => 'test.messages.exchange'
],
[
// bindOptions
'queue' => 'test.messages.queue',
'exchange' => 'test.messages.exchange'
],
[
// messageOptions
'properties' => [
'content_type' => 'text/plain',
]
],
[
// publishOptions
'exchange' => 'test.messages.exchange'
]
);
// Variant I (1)
$publisher->connect()->queue()->exchange()->bind();
foreach ($messages as $message) {
$publisher->publish($message);
}
$publisher->disconnect();
// Variant I (2)
$publisher->prepare();
foreach ($messages as $message) {
$publisher->publish($message);
}
$publisher->disconnect();
// Variant I (3)
$publisher->work($messages);
// Publisher Demo 2
$messages = [
'This is an example message. ID [1].',
'This is an example message. ID [2].',
'This is an example message. ID [3].'
];
$publisher = new Publisher();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$publisher->connectionOptions = [
'host' => 'localhost',
'user' => 'guest',
'password' => 'guest'
];
$publisher->connect();
$publisher->queue([
'queue' => 'test.messages.queue'
]);
$publisher->exchange([
'exchange' => 'test.messages.exchange'
]);
$publisher->bind([
'queue' => 'test.messages.queue',
'exchange' => 'test.messages.exchange'
]);
foreach ($messages as $message) {
$publisher->publish(
[
'body' => $message,
'properties' => [
'content_type' => 'text/plain',
]
],
[
'exchange' => 'test.messages.exchange'
]
);
}
$publisher->disconnect();
// Consumer Demo 2
$variable = 'This variable is needed in your callback. It will be the second, the first is always the message!';
$consumer = new Consumer();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$consumer->connectionOptions = [
'host' => 'localhost',
'user' => 'guest',
'password' => 'guest'
];
$consumer->connect();
$consumer->queue([
'queue' => 'test.messages.queue'
]);
$consumer->qos([
'prefetch_count' => 10
]);
$consumer->consume(
[
'YourNamespace\YourClass',
'yourCallback'
],
[
$variable
],
[
'queue' => 'test.messages.queue'
]
);
$consumer->wait();
$consumer->disconnect();
// Advanced Publisher Demo
use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Helper\Serializer;
// Preparing some data to work with.
$data = [];
for ($i = 0; $i < 10000; $i++) {
$data[] = [
'id' => $i,
'importance' => $i % 3 == 0 ? 'high' : 'low', // Tag 1/3 of the messages with high importance.
'text' => 'Test message with ID ' . $i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$config = new Config();
// Instantiating a client.
$client = new Client($config);
// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');
// Retrieving a publisher from the client.
/** @var \MAKS\AmqpAgent\Worker\Publisher */
$publisher = $client->get('publisher');
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$publisher->connect();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$publisher->queue([
'queue' => 'high.and.low.importance.queue',
'arguments' => $publisher->arguments([
'x-max-priority' => 2,
'x-queue-mode' => 'lazy'
])
]);
// Declaring a direct exchange to publish messages to.
$publisher->exchange([
'exchange' => 'high.and.low.importance.exchange',
'type' => 'direct'
]);
// Binding the queue with the exchange.
$publisher->bind([
'queue' => 'high.and.low.importance.queue',
'exchange' => 'high.and.low.importance.exchange'
]);
// Publishing messages according to their priority.
foreach ($data as $item) {
$payload = $serializer->serialize($item, 'JSON');
if ($item['importance'] == 'high') {
$publisher->publish(
[
'body' => $payload,
'properties' => [
'priority' => 2
],
],
[
'exchange' => 'high.and.low.importance.exchange'
]
);
continue;
}
$publisher->publish(
$payload, // Not providing priority will fall back to 0
[
'exchange' => 'high.and.low.importance.exchange'
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$publisher->publish(
[
'body' => $serializer->serialize(
Publisher::makeCommand('start', 'consumer'),
'JSON'
),
'properties' => [
'priority' => 1
],
],
[
'exchange' => 'high.and.low.importance.exchange'
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$iterator = 2;
do {
$publisher->publish(
[
'body' => $serializer->serialize(
Publisher::makeCommand('close', 'channel'),
'JSON'
),
'properties' => [
'priority' => 0
],
],
[
'exchange' => 'high.and.low.importance.exchange'
]
);
$iterator--;
} while ($iterator != 0);
// Close the connection with RabbitMQ server.
$publisher->disconnect();
// Advanced Consumer Demo
use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Helper\Serializer;
use MAKS\AmqpAgent\Helper\Logger;
$config = new Config();
$client = new Client($config);
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var \MAKS\AmqpAgent\Helper\Logger */
$logger = $client->get('logger');
$logger->setDirectory(__DIR__);
$logger->setFilename('high-and-low-importance-messages');
// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');
// Retrieving a consumer from the client.
/** @var \MAKS\AmqpAgent\Worker\Consumer */
$consumer = $client->get('consumer');
$consumer->connect();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$consumer->queue([
'queue' => 'high.and.low.importance.queue',
'arguments' => $consumer->arguments([
'x-max-priority' => 2,
'x-queue-mode' => 'lazy'
])
]);
// Overwriting the default quality of service.
$consumer->qos([
'prefetch_count' => 1,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$callback = function($message, &$client, $callback) {
$data = $client->getSerializer()->unserialize($message->body, 'JSON');
if (Consumer::isCommand($data)) {
Consumer::ack($message);
if (Consumer::hasCommand($data, 'close', 'channel')) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep(5);
// Close the channel using the delivery info of the message.
Consumer::shutdown($message);
} elseif (Consumer::hasCommand($data, 'start', 'consumer')) {
$consumer = $client->getConsumer();
// Getting a new channel on the same connection.
$channel = $consumer->getNewChannel();
$consumer->queue(
[
'queue' => 'high.and.low.importance.queue',
'arguments' => $consumer->arguments([
'x-max-priority' => 2,
'x-queue-mode' => 'lazy'
])
],
$channel
);
$consumer->qos(
[
'prefetch_count' => 1,
],
$channel
);
$consumer->consume(
$callback,
[
&$client,
$callback
],
[
'queue' => 'high.and.low.importance.queue',
'consumer_tag' => 'callback.consumer-' . uniqid()
],
$channel
);
}
return;
}
$client->getLogger()->write("({$data['importance']}) - {$data['text']}");
// Sleep for 50ms to mimic some processing.
usleep(50000);
// The final step is acknowledgment so that no data is lost.
Consumer::ack($message);
};
$consumer->consume(
$callback,
[
&$client, // Is used to refetch the consumer, serializer, and logger.
$callback // This gets passed to the consumer that get started by the callback.
],
[
'queue' => 'high.and.low.importance.queue'
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$consumer->waitForAll();
// Close the connection with RabbitMQ server.
$consumer->disconnect();
// Advanced RPC Client Demo
use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ClientEndpoint;
$config = new Config();
$client = new Client($config);
// Retrieving an RPC client endpoint from the client.
/** @var \MAKS\AmqpAgent\RPC\ClientEndpoint */
$rpcClient = $client->getClientEndpoint();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$rpcClient
->on('connection.after.open', function ($connection, $rpcClient, $eventName) {
printf('%s has emitted [%s] event and is now connected!', get_class($rpcClient), $eventName);
if ($connection instanceof AMQPStreamConnection) {
printf(' The connection has currently %d channel(s).', count($connection->channels) - 1);
}
})->on('request.before.send', function ($request, $rpcClient, $eventName) {
printf('%s has emitted [%s] event and is about to send a request!', get_class($rpcClient), $eventName);
if ($request instanceof AMQPMessage) {
$request->set('content_type', 'application/json')
printf(' The request content_type header has been set to: %s', $request->get('content_type'));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$roundtrip = $rpcClient->ping();
$rpcClient->connect();
$response = $rpcClient->request('{"command":"some-command","parameter":"some-parameter"}');
$rpcClient->disconnect();
// Advanced RPC Server Demo
use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ServerEndpoint;
$config = new Config();
$client = new Client($config);
// Retrieving an RPC server from the client.
/** @var \MAKS\AmqpAgent\RPC\ServerEndpoint */
$rpcServer = $client->getServerEndpoint();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$rpcServer
->on('request.on.get', function ($request, $rpcServer, $eventName) {
printf('%s has emitted [%s] event and has just got a request!', get_class($rpcServer), $eventName);
if ($request instanceof AMQPMessage) {
printf(' The request has the following body: %s', $request->body;
}
});
$rpcServer->connect();
$request = $rpcServer->respond('YourNamespace\YourClass::yourCallback');
$rpcServer->disconnect();
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.