PHP code example of alicfeng / aliyun_rocket_mq

1. Go to this page and download the library: Download alicfeng/aliyun_rocket_mq library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

alicfeng / aliyun_rocket_mq example snippets



$config = [
    'client'   => [
        'endpoint'   => env('MQ_ROCKET_CLIENT_ENDPOINT'),
        'access_key' => env('MQ_ROCKET_CLIENT_ACCESS_KEY'),
        'secret_key' => env('MQ_ROCKET_CLIENT_SECRET_KEY'),
    ],
    'consumer' => [
        'handler_base_namespace' => env('MQ_ROCKET_CONSUMER_HANDLER_BASE_NAMESPACE'),
        'topic'                  => env('MQ_ROCKET_CONSUMER_TOPIC'),
        'message_tags'           => [

        ],
        'group_id'               => env('MQ_ROCKET_CONSUMER_GROUP_ID'),
        'instance_id'            => env('MQ_ROCKET_CONSUMER_INSTANCE_ID'),
    ],

    'cache' => [
        'enable'   => env('MQ_ROCKET_CACHE_ENABLE', true),
        'host'     => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', null),
        'port'     => env('REDIS_PORT', '6379'),
        'database' => env('REDIS_DB', '0'),
    ]
];

use MQ\Model\TopicMessage;
use Samego\RocketMQ\Consumer;
use Samego\RocketMQ\Enum\MessageTagEnum;
use Samego\RocketMQ\Enum\TopicEnum;
use Samego\RocketMQ\Event\MessageEvent;
use Samego\RocketMQ\Producer;

$message = new TopicMessage(['name' => 'hello world']);
$message->putProperty('timestamp', time());
$message->setMessageTag(MessageTagEnum::TRAINING_SERVICE_TRAINING_CONTROLLER);
$message->setMessageKey('uuid');

// 普通消息发送
Producer::normal($config['client'])->publish('MQ_xxx', TopicEnum::DEMO_SERVICE, $message);

// 普通消息订阅
Consumer::normal($config['client'], new MessageEvent($config['consumer'], $config['cache']))->subscribe();

// 消费配置
'consumer' => [
  // 定义承载消费处理基类命名空间
  'handler_base_namespace' => 'App\\Queue\\Handler',
  'topic'                  => env('MQ_ROCKET_CONSUMER_TOPIC'),
  'message_tags'           => [
    'Demo'
  ],
  'group_id'               => env('MQ_ROCKET_CONSUMER_GROUP_ID'),
  'instance_id'            => env('MQ_ROCKET_CONSUMER_INSTANCE_ID'),
];

// 定义好了 consumer.handler_base_namespace 与 consumer.message_tags.* 则需要定义Demo消费处理类 DemoHandler
// 同时此类需要继承 Samego\RocketMQ\Contract\QueueServiceHandlerInterface 接口

namespace App\Queue\Handler;
use MQ\Model\Message;
use Samego\RocketMQ\Contract\QueueServiceHandlerInterface;
use Samego\RocketMQ\Helper\StdLogHelper;

class DemoHandler implements QueueServiceHandlerInterface
{
    public function handler(Message $message): bool
    {
        return true;
    }

    public function failure(Message $message): void
    {
      
    }
}