1. Go to this page and download the library: Download alicfeng/aliyun_rocket_mq library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
use MQ\Model\TopicMessage;
use Samego\RocketMQ\Consumer;
use Samego\RocketMQ\Enum\MessageTagEnum;
use Samego\RocketMQ\Enum\TopicEnum;
use Samego\RocketMQ\Event\MessageEvent;
use Samego\RocketMQ\Producer;
$message = new TopicMessage(['name' => 'hello world']);
$message->putProperty('timestamp', time());
$message->setMessageTag(MessageTagEnum::TRAINING_SERVICE_TRAINING_CONTROLLER);
$message->setMessageKey('uuid');
// 普通消息发送
Producer::normal($config['client'])->publish('MQ_xxx', TopicEnum::DEMO_SERVICE, $message);
// 普通消息订阅
Consumer::normal($config['client'], new MessageEvent($config['consumer'], $config['cache']))->subscribe();
// 消费配置
'consumer' => [
// 定义承载消费处理基类命名空间
'handler_base_namespace' => 'App\\Queue\\Handler',
'topic' => env('MQ_ROCKET_CONSUMER_TOPIC'),
'message_tags' => [
'Demo'
],
'group_id' => env('MQ_ROCKET_CONSUMER_GROUP_ID'),
'instance_id' => env('MQ_ROCKET_CONSUMER_INSTANCE_ID'),
];
// 定义好了 consumer.handler_base_namespace 与 consumer.message_tags.* 则需要定义Demo消费处理类 DemoHandler
// 同时此类需要继承 Samego\RocketMQ\Contract\QueueServiceHandlerInterface 接口
namespace App\Queue\Handler;
use MQ\Model\Message;
use Samego\RocketMQ\Contract\QueueServiceHandlerInterface;
use Samego\RocketMQ\Helper\StdLogHelper;
class DemoHandler implements QueueServiceHandlerInterface
{
public function handler(Message $message): bool
{
return true;
}
public function failure(Message $message): void
{
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.