PHP code example of pzr / amqp

1. Go to this page and download the library: Download pzr/amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

pzr / amqp example snippets



return array(
    /** RPC消费者 */
    'rpcConsumer' => [
        'class' =>  \pzr\amqp\RpcAmqp::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
    ],
    /** 普通消费者 */
    'consumer' => [
        // 'class' =>  \pzr\amqp\Amqp::class,
        'class' =>  \pzr\amqp\AmqpBase::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        // 'ackPolicy' => [
        //     'component' => 'PolicyAckRetryCount',  // 指定使用重试次数计次ACK策略, 如不指定, 会以AmqpBase中ackPolicy属性默认指定
        // ]
    ],
    /** 普通队列定义 */
    'easyQueue' => [
        'class' =>  \pzr\amqp\queue\EasyQueue::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => 'easy_queue',
        'exchangeName' => 'easy_exchange',
        'routingKey' => 'easy',
        // 'serizer' => \pzr\amqp\serializers\PhpSerializer, //default value
        // 'dulicater' => \pzr\amqp\duplicate\DuplicateRandom, //default value
        // 'duplicate' => 0, //队列的副本数,不启用则设置为0
        // priority => 10, //定义优先级队列时配置
    ],
    /** 延时队列定义 */
    'delayQueue' => [
        'class' =>  \pzr\amqp\queue\DelayQueue::class,
        'as log' => \pzr\amqp\LogBehavior::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => 'normal_queue',
        'exchangeName' => 'normal_exchange',
        'routingKey' => 'normal',
        'delayQueueName' => 'delay_queue',
        'delayExchangeName' => 'delay_exchange',
        'delayRoutingKey' => 'delay',
        'ttl' => 5000, //ms
        'duplicate' => 2,
        // Other driver options
    ],
    /** 策略API */
    'policy' => [
        'class' =>  \pzr\amqp\api\Policy::class,
        'host' => '127.0.0.1',
        'port' => 15672,
        'user' => 'guest',
        'password' => 'guest',
        'policyConfig' => [
            'pattern' => 'easy_queue_*',
            'definition' => [
                'ha-mode' => 'all',
                'ha-sync-mode' => 'manual',
            ],
            'priority' => 0,
            'apply-to' => 'queues',
            'name' => 'easy_queue',
        ]
    ],
    /** RPC队列 */
    'rpcQueue' => [
        'class' =>  \pzr\amqp\queue\RpcQueue::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => 'rpc',
        'exchangeName' => 'rpc',
        'routingKey' => 'rpc',
        'duplicate' => 2,
    ],
);

Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
    Yii::$app->easyQueue->bind(); //绑定队列,如果已经绑定可以注释此方法
    // $event->noWait = true; //关闭客户端的消息确认机制
});

Yii::$app->easyQueue->push(new CountJob([
    'count' => 1,
]);

class CountJob extends \pzr\amqp\AmqpJob
{
    public $count;
    
    public function execute()
    {
        return $this->count;
    }
}

Yii::$app->delayQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
    Yii::$app->delayQueue->bind();
});

Yii::$app->delayQueue->push(new CountJob([
    'count' => 1
]));

// 在绑定的时定义队列的最大优先级
Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function ($event) {
    Yii::$app->easyQueue->setPriority(10)->bind();
});

// 发送消息时定义每个消息的优先级
$job = new CountJob([
    'count' => 1,
    'priority' => 2,
]);
Yii::$app->easyQueue->push($job);

Yii::$app->rpcQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
    Yii::$app->rpcQueue->bind();
});

// 批量请求
for ($i=1; $i<=10; $i++) {
    $jobs[] = new RequestJob([
        'request' => 'request_' . $i,
    ]);
}
// qos:临时队列消费者预处理的数量;timeout:临时队列等待消费者处理的超时时间,单位s
$response = Yii::$app->rpcQueue->setQos(1)->setTimeout(3)->myPublishBatch($jobs);

return $response;

class AmqpController extends Controller
{
    // 普通消费者定义
    public function actionConsumer($queueName, $qos)
    {
        Yii::$app->consumer->consume($queueName, $qos);
    }

    // RPC消费者定义
    public function actionRpcConsumer($queueName, $qos=1)
    {
        Yii::$app->rpcConsumer->consume($queueName, $qos);
    }
}

'policy' => [
    'class' =>  \pzr\amqp\api\Policy::class,
    'host' => '127.0.0.1', //your hostname
    'port' => 15672,    //default port
    'user' => 'guest', //default user name
    'password' => 'guest', //default password
    'policyConfig' => [ //policy config
        'pattern' => 'easyqueue*', //匹配所有符合该正则的队列
        'definition' => [
            'ha-mode' => 'all', //default all, choose one of [all, exactly, nodes]
            'ha-sync-mode' => 'manual', // default manual, choose one of [manual, automatic]
            // 'ha-params' => [], //depend on ha-mode
        ],
        'priority' => 0, //default 0
        'apply-to' => 'queues', //choose one of [all,queues,exchanges]
        'name' => 'easyQueue', //free name
    ]
]

Yii::$app->policy->setPolicy();

'<Queue>' => [
    'class' =>  \pzr\amqp\queue\<QueueName>::class,
    // ...省略
    'strict' => true,

'easyQueue' => [
        'class' =>  \pzr\amqp\queue\EasyQueue::class,
        // ...省略
        'api' => [
            'component' => 'amqpApi',
        ]
    ],

'api' => [
    'class' => \pzr\amqp\api\AmqpApi::class,
]

'api' => [
    'class' => \pzr\amqp\api\Policy::class,
    'policyConfig' => [
        'pattern' => 'easy_queue_*',
        'definition' => [
            'ha-mode' => 'all',
            'ha-sync-mode' => 'manual',
        ],
        'priority' => 0,
        'apply-to' => 'queues',
        'name' => 'easy_queue',
    ]
]

Yii::$app->easyQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
    Yii::$app->easyQueue->bind();
    // 设定策略
    Yii::$app->easyQueue->getApi()->setPolicy();
});

// ACK Policy
'PolicyAckRetryCount' => [
    'class' => \pzr\amqp\ack\PolicyAckRetryCount::class,
    'retryLimit' => 5       // 可指定属性覆盖
],
'PolicyAckNormal' => [
    'class' => \pzr\amqp\ack\PolicyAckNormal::class,
],
'PolicyNoAck' => [
    'class' => \pzr\amqp\ack\PolicyNoAck::class,
],

/** 普通消费者 */
'consumer' => [
    // 'class' =>  \pzr\amqp\Amqp::class,
    'class' =>  \pzr\amqp\AmqpBase::class,
    'host' => '127.0.0.1',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
    // 'ackPolicy' => [
    //     'component' => 'PolicyAckRetryCount',  // 指定使用重试次数计次ACK策略, 如不指定, 会以AmqpBase中ackPolicy属性默认指定
    // ]
],
shell
php yii amqp/consumer queueName qos #启动普通消费者
php yii amqp/rpc-consumer queueName qos #启动RPC消费者