PHP code example of volcengine / ve-rocketmq-php-sdk

1. Go to this page and download the library: Download volcengine/ve-rocketmq-php-sdk library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

volcengine / ve-rocketmq-php-sdk example snippets


use RMQ\Client;

//  HTTP Proxy 接入点
$endpoint = ""; 
// 密钥
$accessKey = ""; 
$secretKey = "";

// 实例化客户端
$client = new Client($endpoint, $accessKey, $secretKey);

// 创建一个生产者
$producer = $client->createProducer();

use RMQ\Message;

// 目标topic
$topic ="topic_name"; 
// 消息的内容
$messageContent = "content." 

// 实例化一个消息
$msg = new Message($topic, $messageContent);
// 设置消息的tag值
$msg->setTag("tag_a");
// 设置ShardingKey
$msg->setShardingKey("my_key");
// 设置自定义属性
$msg->putProperty("property_name", "test");

$producer->open();

$msg = new Message("topic_name", "hello!");
$messageInfo = $producer->publishMessage($msg);
$producer->close();

var_dump(messageInfo);

$groupID = ""; // 消费组ID
 
$consumer = $client->createConsumer($groupID, [
  // 每次调用consumeMessage最多拉取12条消息
  "max_message_number" => 12,
  // 在消息达到max_message_number之前,请求在服务端挂起的最大等待时长(单位ms)
  "max_wait_time"      => 3000
]);

use RMQ\Model\MessageInfo;

$consumer = $client->createConsumer($groupID);
// 订阅topic_a 全部消息1
$consumer->subscribe("topic_a");
// 订阅topic_b tag为A的消息
$consumer->subscribe("topic_b", "A");

$consumer->open();

// 拉取消息
$messages = $consumer->consumeMessage();

$acksHandles = [];
foreach ($messages as $msg) {
    $body = $msg->body;
    echo "message bode: $body \n";
    array_push($acksHandles, $msg->msgHandle);
}
// 确认消息的消费情况
// ackMessages第一个参数是确认消费成功的消息的msgHandle
// ackMessages第二个参数是确认消费失败的消息的msgHandle
$consumer->ackMessages($acksHandles, []);

$consumer->close();

$producer->open();
// 在open后等待60秒
sleep(60);
// 下面的方法调用会失败,因为服务端的生产者实例已超时被销毁掉
$producer->publishMessage($msg);

use RMQ\Exception\MQTokenTimeoutException;

for ($i = 0; $i < 10; $i++) {
  if ($i % 2 == 0) {
    sleep(60 * 10); // 偶数消息等待10分钟
  }
  
  $message = new Message("topic_name", "hello!");
  try {
    // 发送消息
    $producer->publishMessage($message);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $producer->open();
    // 对消息重发
    $producer->publishMessage($message);
  } catch (RuntimeException $e) {
    // 其他错误情况
    echo $e . "\n";
  }
}

$consumer->open();

while (true) {
  try {
    // 拉取消息
    $messages    = $consumer->consumeMessage();
    $acksHandles = [];
    foreach ($messages as $msg) {
        $body = $msg->body;
        echo "message bode: $body \n";
        array_push($acksHandles, $msg->msgHandle);
    }
    // 确认消费状态
    $consumer->ackMessages($acksHandles, []);
  } catch (MQTokenTimeoutException $e) {
    // token失效的情况需要重连
    $consumer->open();
  } catch (RuntimeException $e) {
    // 其他错误 
    echo $e;
  }
}

use RMQ\Message;

$msg = new Message("topic_name", "content.");
// 消息投递的具体毫秒时间戳(当前时间延迟30秒)
$postTime = time() * 1000 + 30000; 
// 将延时属性设置到Property中
$msg->putProperty("__STARTDELIVERTIME", "$currentTimeStamp"); 

$msg2 = new Message("topic_name", "content");

$msg2->setDelayLevel(5)

$level = 5

$msg2 = new Message("topic_name", "content");

$msg2->putProperty("__DelayTimeLevel", "$level");