PHP code example of uncleqiu / hyperf-rocketmq

1. Go to this page and download the library: Download uncleqiu/hyperf-rocketmq library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

uncleqiu / hyperf-rocketmq example snippets


return [
    'default' => [ // 分组名,基于 host、port、scheme 进行区分
        'host' => env('ROCKETMQ_HTTP_HOST'),
        'access_key' => env('ROCKETMQ_HTTP_ACCESS_KEY_ID'),
        'secret_key' => env('ROCKET_MQ_HTTP_ACCESS_KEY_SECRET'),
        'instance_id' => env('ROCKET_MQ_HTTP_INSTANCE_ID'),
        'pool' => [
            'min_connections' => 50,
            'max_connections' => 300,
            'connect_timeout' => 3.0,
            'wait_timeout' => 30.0,
            'heartbeat' => -1,
            'max_idle_time' => 60.0,
        ],
    ],
];



declare(strict_types=1);

namespace App\Controller;

use App\Producer\DemoProducer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Uncleqiu\HyperfRocketMQ\Producer;

#[Controller]
class IndexController extends AbstractController
{
    #[Inject(Producer::class)]
    protected Producer $producer;

    #[RequestMapping("index")]
    public function index()
    {
        $message = new DemoProducer(['a' => 1, 'b' =>2]);
        $this->producer->produce($message);
        return $this->response->json([]);
    }
}

   $demoProducer = new BarProducer(['test' => 12345, 'name' => '张三1231']);
   
   Db::beginTransaction();
   try{
       // todo 业务逻辑
   
       // 记录消息状态
       $demoProducer->saveMessageStatus();
       Db::commit();
   } catch(\Throwable $ex){
       Db::rollBack();
   }
   
   // 推送消息
   $this->producer->produce($demoProducer);
   

use Uncleqiu\HyperfRocketMQ\Annotation\Consumer;
use Uncleqiu\HyperfRocketMQ\Library\Model\Message as RocketMQMessage;
use Uncleqiu\HyperfRocketMQ\Message\ConsumerMessage;
use Uncleqiu\HyperfRocketMQ\Result;

#[Consumer(topic: "Topic_03_test", groupId: "test_test", messageTag: "tMsgKey||tMsgKey_bar")]
class DemoCounser extends ConsumerMessage
{
    public function consumeMessage(RocketMQMessage $rocketMQMessage): string
    {
        $msgTag = $rocketMQMessage->getMessageTag(); // 消息标签
        $msgKey = $rocketMQMessage->getMessageKey(); // 消息唯一标识
        $msgBody = $rocketMQMessage->toArray(); // 消息体
        $msgId = $rocketMQMessage->getMessageId();

        var_dump('消费端接收到消息:', $msgBody);

        return Result::ACK;
    }
}
shell
php bin/hyperf.php ext-gen:rocketmq-consumer 领域名 消费者名称   
php bin/hyperf.php ext-gen:rocketmq-producer 领域名 生产者名称   
shell
php bin/hyperf.php vendor:publish uncleqiu/hyperf-rocketmq
shell
php bin/hyperf.php migrate --path=migrations/rocketmq
shell

declare(strict_types=1);

namespace App\Test\Queue\Producer;

use Uncleqiu\HyperfRocketMQ\Annotation\Producer;
use Uncleqiu\HyperfRocketMQ\Message\ProducerMessage;

#[Producer(topic:"Topic_03_test", messageTag:"tMsgKey")]
class DemoProducer extends ProducerMessage
{
    public function __construct(array $data)
    {
        // 设置消息内容
        $this->setPayload($data);
        // 自定义messageKey(不定义,会自动生成)
        $this->setMessageKey('xxxxx');
    }
}
shell
   php bin/hyperf.php migrate --path=migrations/rocketmq