PHP code example of pzr / amqp
1. Go to this page and download the library: Download pzr/amqp library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
pzr / amqp example snippets
return array(
/** RPC消费者 */
'rpcConsumer' => [
'class' => \pzr\amqp\RpcAmqp::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
],
/** 普通消费者 */
'consumer' => [
// 'class' => \pzr\amqp\Amqp::class,
'class' => \pzr\amqp\AmqpBase::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
// 'ackPolicy' => [
// 'component' => 'PolicyAckRetryCount', // 指定使用重试次数计次ACK策略, 如不指定, 会以AmqpBase中ackPolicy属性默认指定
// ]
],
/** 普通队列定义 */
'easyQueue' => [
'class' => \pzr\amqp\queue\EasyQueue::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'easy_queue',
'exchangeName' => 'easy_exchange',
'routingKey' => 'easy',
// 'serizer' => \pzr\amqp\serializers\PhpSerializer, //default value
// 'dulicater' => \pzr\amqp\duplicate\DuplicateRandom, //default value
// 'duplicate' => 0, //队列的副本数,不启用则设置为0
// priority => 10, //定义优先级队列时配置
],
/** 延时队列定义 */
'delayQueue' => [
'class' => \pzr\amqp\queue\DelayQueue::class,
'as log' => \pzr\amqp\LogBehavior::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'normal_queue',
'exchangeName' => 'normal_exchange',
'routingKey' => 'normal',
'delayQueueName' => 'delay_queue',
'delayExchangeName' => 'delay_exchange',
'delayRoutingKey' => 'delay',
'ttl' => 5000, //ms
'duplicate' => 2,
// Other driver options
],
/** 策略API */
'policy' => [
'class' => \pzr\amqp\api\Policy::class,
'host' => '127.0.0.1',
'port' => 15672,
'user' => 'guest',
'password' => 'guest',
'policyConfig' => [
'pattern' => 'easy_queue_*',
'definition' => [
'ha-mode' => 'all',
'ha-sync-mode' => 'manual',
],
'priority' => 0,
'apply-to' => 'queues',
'name' => 'easy_queue',
]
],
/** RPC队列 */
'rpcQueue' => [
'class' => \pzr\amqp\queue\RpcQueue::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'rpc',
'exchangeName' => 'rpc',
'routingKey' => 'rpc',
'duplicate' => 2,
],
);
Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
Yii::$app->easyQueue->bind(); //绑定队列,如果已经绑定可以注释此方法
// $event->noWait = true; //关闭客户端的消息确认机制
});
Yii::$app->easyQueue->push(new CountJob([
'count' => 1,
]);
class CountJob extends \pzr\amqp\AmqpJob
{
public $count;
public function execute()
{
return $this->count;
}
}
Yii::$app->delayQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
Yii::$app->delayQueue->bind();
});
Yii::$app->delayQueue->push(new CountJob([
'count' => 1
]));
// 在绑定的时定义队列的最大优先级
Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
Yii::$app->easyQueue->setPriority(10)->bind();
});
// 发送消息时定义每个消息的优先级
$job = new CountJob([
'count' => 1,
'priority' => 2,
]);
Yii::$app->easyQueue->push($job);
Yii::$app->rpcQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
Yii::$app->rpcQueue->bind();
});
// 批量请求
for ($i=1; $i<=10; $i++) {
$jobs[] = new RequestJob([
'request' => 'request_' . $i,
]);
}
// qos:临时队列消费者预处理的数量;timeout:临时队列等待消费者处理的超时时间,单位s
$response = Yii::$app->rpcQueue->setQos(1)->setTimeout(3)->myPublishBatch($jobs);
return $response;
class AmqpController extends Controller
{
// 普通消费者定义
public function actionConsumer($queueName, $qos)
{
Yii::$app->consumer->consume($queueName, $qos);
}
// RPC消费者定义
public function actionRpcConsumer($queueName, $qos=1)
{
Yii::$app->rpcConsumer->consume($queueName, $qos);
}
}
'policy' => [
'class' => \pzr\amqp\api\Policy::class,
'host' => '127.0.0.1', //your hostname
'port' => 15672, //default port
'user' => 'guest', //default user name
'password' => 'guest', //default password
'policyConfig' => [ //policy config
'pattern' => 'easyqueue*', //匹配所有符合该正则的队列
'definition' => [
'ha-mode' => 'all', //default all, choose one of [all, exactly, nodes]
'ha-sync-mode' => 'manual', // default manual, choose one of [manual, automatic]
// 'ha-params' => [], //depend on ha-mode
],
'priority' => 0, //default 0
'apply-to' => 'queues', //choose one of [all,queues,exchanges]
'name' => 'easyQueue', //free name
]
]
Yii::$app->policy->setPolicy();
'<Queue>' => [
'class' => \pzr\amqp\queue\<QueueName>::class,
// ...省略
'strict' => true,
'easyQueue' => [
'class' => \pzr\amqp\queue\EasyQueue::class,
// ...省略
'api' => [
'component' => 'amqpApi',
]
],
'api' => [
'class' => \pzr\amqp\api\AmqpApi::class,
]
'api' => [
'class' => \pzr\amqp\api\Policy::class,
'policyConfig' => [
'pattern' => 'easy_queue_*',
'definition' => [
'ha-mode' => 'all',
'ha-sync-mode' => 'manual',
],
'priority' => 0,
'apply-to' => 'queues',
'name' => 'easy_queue',
]
]
Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
Yii::$app->easyQueue->bind();
// 设定策略
Yii::$app->easyQueue->getApi()->setPolicy();
});
// ACK Policy
'PolicyAckRetryCount' => [
'class' => \pzr\amqp\ack\PolicyAckRetryCount::class,
'retryLimit' => 5 // 可指定属性覆盖
],
'PolicyAckNormal' => [
'class' => \pzr\amqp\ack\PolicyAckNormal::class,
],
'PolicyNoAck' => [
'class' => \pzr\amqp\ack\PolicyNoAck::class,
],
/** 普通消费者 */
'consumer' => [
// 'class' => \pzr\amqp\Amqp::class,
'class' => \pzr\amqp\AmqpBase::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
// 'ackPolicy' => [
// 'component' => 'PolicyAckRetryCount', // 指定使用重试次数计次ACK策略, 如不指定, 会以AmqpBase中ackPolicy属性默认指定
// ]
],
shell
php yii amqp/consumer queueName qos #启动普通消费者
php yii amqp/rpc-consumer queueName qos #启动RPC消费者