PHP code example of xiaosongshu / rabbitmq

1. Go to this page and download the library: Download xiaosongshu/rabbitmq library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

xiaosongshu / rabbitmq example snippets




namespace xiaosongshu\test;

��队列演示
 * @author yanglong
 * @time 2025年7月23日15:34:29
 * @note 当前版本客户端支持死信队列。自动维护客户端登录。
 * @warning 不要在任何地方使用exit函数,以免引起客户端掉线。
 */
class Demo extends \Xiaosongshu\Rabbitmq\Client
{
    /** 以下是rabbitmq配置 ,请填写您自己的配置 */

    /** @var string $host 服务器地址 */
    public static $host = "192.168.110.72";

    /** @var int $port 服务器端口 */
    public static $port = 5672;

    /** @var string $user 服务器登陆用户 */
    public static $user = "admin";

    /** @var string $pass 服务器登陆密码 */
    public static $pass = "123456";
    /** 指定队列名称 */
    public static $queueName = 'xiaosongshu\test\Demo';
    /** 指定交换机名称 */
    public static $exchangeName = 'xiaosongshu\test\Demo';

    /** @var bool $enableDlx 是否开启死信队列 */
    public static $enableDlx = true;
    
    /** 设置为延迟队列,若使用延迟队列,那么可以不设置 */
    public static $type = self::EXCHANGETYPE_DELAYED;

    /**
     * 业务处理
     * @param array $params
     * @return int
     * @note 你的业务代码应该写在这里,但是建议你的业务逻辑执行时间不要超过60秒,以免引起掉线风险。若你的逻辑执行时间确实超过了60秒,那么建议使用
     * 定时任务或者脚本来执行。
     */
    public static function handle(array $params): int
    {
        // todo 这里需要编写你的正常业务处理逻辑,当前仅为示例代码
        var_dump("正常队列处理",$params);
        if (isset($params['id'])){
            if ($params['id']%3 ==0){
                /** 实际场景不可以抛出任何异常,应用捕获所有异常并处理 ,以免引起程序崩溃,虽然底层已经做了兜底 */
                throw new \Exception("模拟程序故障,抛出异常。");
            }
        }
        # 模拟业务逻辑阻塞
        sleep(4);
        /** 成功,返回ack */
        return self::ACK;
    }

    /**
     * 处理异常消息
     * @param \RuntimeException $exception
     * @return void
     */
    public static function error(\RuntimeException $exception)
    {
        //todo 这里写对于异常的处理,比如日志记录,发送短信通知、邮件通知等等,或者人工介入处理
        //var_dump("发生了异常",$exception->getMessage());
    }

    /**
     * 死信队列处理逻辑
     * @param array $params
     * @return int
     * @note 此处尽量不要编写需要执行60秒以上的业务逻辑,以免引起客户端掉线,若你的业务确实超过了60秒,建议使用单独的定任务或者脚本来处理。
     * @note 此处属于兜底业务逻辑,不建议写太复杂的处理方法,本方法只是作为过期消息,异常消息的一个补充,这只是最后一根稻草。切勿编写过于复杂的逻辑业务。
     */
    public static function dlxHandle(array $params):int
    {
        //todo 这里写死信队列的处理逻辑,若不开启死信队列,则不需要写任何逻辑,直接返回ACK即可
        //var_dump("死信队列处理",$params);
        return self::ACK;
    }
}



namespace xiaosongshu\test;

 echo $i . "\r\n";
    /** 投递普通消息 */
    \xiaosongshu\test\Demo::publish(['name' => 'tom', 'time' => time(), 'id' => $i]);
    sleep(1);
}
echo "投递完成\r\n";


namespace xiaosongshu\test;

�塞,后面的代码不会执行 */
\xiaosongshu\test\Demo::consume();



namespace xiaosongshu\test;

塞,后面的代码不会执行,仅用于windows系统调试,linux系统会自动消费死信队列的消息 */
\xiaosongshu\test\Demo::consumeD();


namespace app\commands;

hu\Rabbitmq\Client
{

    /** 以下是rabbitmq配置 ,请填写您自己的配置 */
    /** @var string $host 服务器地址 */
    public static $host = "127.0.0.1";

    /** @var int $port 服务器端口 */
    public static $port = 5672;

    /** @var string $user 服务器登陆用户 */
    public static $user = "guest";

    /** @var string $pass 服务器登陆密码 */
    public static $pass = "guest";
    
     /** 指定队列名称 */
    public static $queueName = 'app\commands\Demo';
    /** 指定交换机名称 */
    public static $exchangeName = 'app\commands\Demo';
    
    /** @var bool $enableDlx 是否开启死信队列 */
    public static $enableDlx = true;
    
    /**
     * 业务处理
     * @param array $params
     * @return int
     */
    public static function handle(array $params): int
    {
        //TODO 这里写你的业务逻辑
        // ...
        var_dump($params);
        return self::ACK;
        //return self::NACK;
    }
    
      /**
     * 处理异常消息
     * @param \RuntimeException $exception
     * @return void
     */
    public static function error(\RuntimeException $exception)
    {
        var_dump("捕获到了异常",$exception->getMessage());
    }
    
    public static function dlxHandle(array $params):int
    {
        //todo 这里写死信队列的处理逻辑,若不开启死信队列,则不需要写任何逻辑,直接返回ACK即可
        //var_dump("死信队列处理",$params);
        return self::ACK;
    }
}




namespace app\commands;

use yii\console\Controller;

/**
 * @purpose 开启队列消费
 * @note 我只是一个例子
 */
class QueueController extends Controller
{

    /**
     * @api php yii queue/index
     * @return void
     * @throws \Exception
     * @comment 开启消费者
     */
    public function actionIndex()
    {
        Demo::consume();
    }
}

\app\commands\Demo::publish(['name'=>'tome','age'=>15]);



namespace app\service;

�服务,用于写日志,防止多进程日志相互覆盖
 * @author yanglong
 * @time 2025年6月20日12:49:24
 */
class RabbitmqService extends \Xiaosongshu\Rabbitmq\Client
{

    /** 以下是rabbitmq配置 ,请填写您自己的配置 */
    /** @var string $host 服务器地址 */
    public static $host = "127.0.0.1";

    /** @var int $port 服务器端口 */
    public static $port = 5672;

    /** @var string $user 服务器登陆用户 */

    public static $user = "admin";

    /** @var string $pass 服务器登陆密码 */
   
    public static $pass = "123456";

    /** 指定交换机 */
    public static $exchangeName = "app\service\RabbitmqService";

    /** 指定队列 */
    public static $queueName = "app\service\RabbitmqService";

    /** @var bool $enableDlx 是否开启死信队列 */
    public static $enableDlx = true;
    
    /**
     * 业务处理
     * @param array $params
     * @return int
     */
    public static function handle(array $params): int
    {
        if (!empty($params['file']) && !empty($params['content'])){
            /** 写入日志 */
            Token::writeLog($params['file'], $params['content']);
        }
        return self::ACK;
    }

    /**
     * 处理错误逻辑
     * @param \RuntimeException $exception
     * @return void
     */
    public static function error(\RuntimeException $exception)
    {
        /** 此处只是示例,不做具体逻辑处理 */
    }
    
    public static function dlxHandle(array $params):int
    {
        /** 此处只是示例,不做具体逻辑处理 */
        return self::ACK;
    }
}



namespace app\command;

use app\service\RabbitmqService;
use app\service\Token;
use think\console\Command;
use think\console\Input;
use think\console\Output;

/**
 * @purpose 异步写日志服务
 * @author yanglong
 * @time 2025年6月20日15:19:59
 * @command nohup php think check:rabbitmq >/dev/null 2>&1 &
 */
class CheckRabbitmq extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('check:rabbitmq');
        // 设置参数
        
    }

    protected function execute(Input $input, Output $output)
    {
        /** 开启消费者 */
        RabbitmqService::consume();
    }
}



namespace app\command;

use app\service\RabbitmqService;
use app\service\Token;
use think\console\Command;
use think\console\Input;
use think\console\Output;

/**
 * @purpose 异步写日志服务
 * @author yanglong
 * @time 2025年6月20日15:19:59
 * @command nohup php think check:rabbitmqD >/dev/null 2>&1 &
 */
class CheckRabbitmq extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('check:rabbitmqD');
        // 设置参数
        
    }

    protected function execute(Input $input, Output $output)
    {
        /** 开启死信消费者 */
        RabbitmqService::consumeD();
    }
}


RabbitmqService::publish(['file'=>$file, 'content'=>$content]);

 /**
  * 业务逻辑 
  */
 public static function handle(array $params): int
 {
    try{
        $db = new \mysqli("127.0.0.1", "demo", "root", "root", 3306);
        $res = $db->query("select * from users");
        while($row = $res->fetch_assoc()){
            var_dump($row);
        }
        $res->free();
        $db->close();
    }catch (\Exception $exception){

    }
    return self::ACK;
}

 /**
  * 业务逻辑 
  * @note 此处仅作为示例,请根据实际业务需求调整cmd命令 
  */
 public static function handle(array $params): int
 {
    try{
        $id = $params['id'];
        # 你可以传入任意参数,但是需要你自己在/home/index/index里手动解析这些参数
        $cmd = "timeout -s 3 10 php index.php /home/index/index --id={$id}";
        $res = shell_exec($cmd);
        var_dump($res);
    }catch (\Exception $exception){

    }
    return self::ACK;
}

 /**
  * 业务类
  * @return void
  * @note 业务本不需要如此复杂,被逼无奈,出此下策。
  */
  public function index()
  {
    # 此处将会打印出通过上面的命令传入的参数,可能需要你自己来解析这些参数,这些都是很简单的,就不写示例了。
    $argv = $_SERVER['argv'];
    var_dump($argv);
  }
bash
php consume.php
bash
php consumeD.php
bash
php publish.php
bash
php yii queue/index
bash
nohup php yii queue/index >/dev/null 2>&1 &
bash
php think check:rabbitmq
bash 
nohup php think check:rabbitmq >/dev/null 2>&1 &
bash
php think check:rabbitmqD
bash
nohup php think check:rabbitmqD >/dev/null 2>&1 &