PHP code example of rex / mq-for-laravel

1. Go to this page and download the library: Download rex/mq-for-laravel library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

rex / mq-for-laravel example snippets


use Rex\MessageQueue\Facades\MQ;

$channel = MQ::connection('default');	

#=================================================
#  消费者持续订阅获取单条消息
#  $callback需返回true/false,队列才能明确是否删除消息    
#=================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
$channel->consume($queue, $is_ack = true, function($message){
    try {
        echo $message->getBody() . PHP_EOL;
        return true; // 回调为true,队列删除消息
    } catch (\Exception $e) {
        return false; // 回调为false,消息重新入队
    }	
});

$channel->start();	

use MQ;

$channel = MQ::connection('default');	

#===================================================================
#  消费者单条地获取消息,多并发下可能出现意外情况
#  此模式影响RabbitMQ性能,高吞吐量时建议用推模式
#===================================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
while ($channel->size($queue) > 0) {
    try {
        $data = json_decode($channel->pop($queue)->getBody(), true);
        if ($data == 'good') {
            # todo 正常消费后ack
            $channel->ack();
        } else {
            $channel->reject(); // 拒绝消息
            MQ::connection('dlx')->push($data); // 转入死信
        }
    } catch (\Exception $e) {
        echo $e->getMessage();
    }
}

use Rex\MessageQueue\Facades\MQ;

$channel = MQ::connection('default');	
// 注册心跳
$heartbeatHandler = new PCNTLHeartbeatSender($channel->getConnection());  // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
$heartbeatHandler->register(); // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
#=================================================
#  消费者持续订阅获取单条消息
#  $callback需返回true/false,队列才能明确是否删除消息    
#=================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
$channel->consume($queue, $is_ack = true, function($message){
    try {
        echo $message->getBody() . PHP_EOL;
        return true; // 回调为true,队列删除消息
    } catch (\Exception $e) {
        return false; // 回调为false,消息重新入队
    }	
});

$channel->start();	

use MQ;

$channel = MQ::connection('default');	
// 注册心跳
$heartbeatHandler = new PCNTLHeartbeatSender($channel->getConnection());  //Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
$heartbeatHandler->register();  // Windows下没有pcntl扩展 , 在Windows测试代码可以注释掉这行,上测试或者生产把这行加回来
#===================================================================
#  消费者单条地获取消息,多并发下可能出现意外情况
#  此模式影响RabbitMQ性能,高吞吐量时建议用推模式
#===================================================================
$queue = 'queue'; // 可以不传$queue,默认取 config/mq.php 中的defaultQueue
while ($channel->size($queue) > 0) {
    try {
        $data = json_decode($channel->pop($queue)->getBody(), true);
        if ($data == 'good') {
            # todo 正常消费后ack
            $channel->ack();
        } else {
            $channel->reject(); // 拒绝消息
            MQ::connection('dlx')->push($data); // 转入死信
        }
    } catch (\Exception $e) {
        echo $e->getMessage();
    }
}