PHP code example of rex / mq-for-laravel
1. Go to this page and download the library: Download rex/mq-for-laravel library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
rex / mq-for-laravel example snippets
use Rex\MessageQueue\Facades\MQ;
$channel = MQ::connection('default');
#=================================================
# 消费者持续订阅获取单条消息
# $callback需返回true/false,队列才能明确是否删除消息
#=================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
$channel->consume($queue, $is_ack = true, function($message){
try {
echo $message->getBody() . PHP_EOL;
return true; // 回调为true,队列删除消息
} catch (\Exception $e) {
return false; // 回调为false,消息重新入队
}
});
$channel->start();
use MQ;
$channel = MQ::connection('default');
#===================================================================
# 消费者单条地获取消息,多并发下可能出现意外情况
# 此模式影响RabbitMQ性能,高吞吐量时建议用推模式
#===================================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
while ($channel->size($queue) > 0) {
try {
$data = json_decode($channel->pop($queue)->getBody(), true);
if ($data == 'good') {
# todo 正常消费后ack
$channel->ack();
} else {
$channel->reject(); // 拒绝消息
MQ::connection('dlx')->push($data); // 转入死信
}
} catch (\Exception $e) {
echo $e->getMessage();
}
}
use Rex\MessageQueue\Facades\MQ;
$channel = MQ::connection('default');
// 注册心跳
$heartbeatHandler = new PCNTLHeartbeatSender($channel->getConnection()); // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
$heartbeatHandler->register(); // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
#=================================================
# 消费者持续订阅获取单条消息
# $callback需返回true/false,队列才能明确是否删除消息
#=================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
$channel->consume($queue, $is_ack = true, function($message){
try {
echo $message->getBody() . PHP_EOL;
return true; // 回调为true,队列删除消息
} catch (\Exception $e) {
return false; // 回调为false,消息重新入队
}
});
$channel->start();
use MQ;
$channel = MQ::connection('default');
// 注册心跳
$heartbeatHandler = new PCNTLHeartbeatSender($channel->getConnection()); //Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
$heartbeatHandler->register(); // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
#===================================================================
# 消费者单条地获取消息,多并发下可能出现意外情况
# 此模式影响RabbitMQ性能,高吞吐量时建议用推模式
#===================================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
while ($channel->size($queue) > 0) {
try {
$data = json_decode($channel->pop($queue)->getBody(), true);
if ($data == 'good') {
# todo 正常消费后ack
$channel->ack();
} else {
$channel->reject(); // 拒绝消息
MQ::connection('dlx')->push($data); // 转入死信
}
} catch (\Exception $e) {
echo $e->getMessage();
}
}