PHP code example of volcengine / ve-rocketmq-php-sdk
1. Go to this page and download the library: Download volcengine/ve-rocketmq-php-sdk library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
volcengine / ve-rocketmq-php-sdk example snippets
use RMQ\Client;
// HTTP Proxy 接入点
$endpoint = "";
// 密钥
$accessKey = "";
$secretKey = "";
// 实例化客户端
$client = new Client($endpoint, $accessKey, $secretKey);
// 创建一个生产者
$producer = $client->createProducer();
use RMQ\Message;
// 目标topic
$topic ="topic_name";
// 消息的内容
$messageContent = "content."
// 实例化一个消息
$msg = new Message($topic, $messageContent);
// 设置消息的tag值
$msg->setTag("tag_a");
// 设置ShardingKey
$msg->setShardingKey("my_key");
// 设置自定义属性
$msg->putProperty("property_name", "test");
$producer->open();
$msg = new Message("topic_name", "hello!");
$messageInfo = $producer->publishMessage($msg);
$producer->close();
var_dump(messageInfo);
$groupID = ""; // 消费组ID
$consumer = $client->createConsumer($groupID, [
// 每次调用consumeMessage最多拉取12条消息
"max_message_number" => 12,
// 在消息达到max_message_number之前,请求在服务端挂起的最大等待时长(单位ms)
"max_wait_time" => 3000
]);
use RMQ\Model\MessageInfo;
$consumer = $client->createConsumer($groupID);
// 订阅topic_a 全部消息1
$consumer->subscribe("topic_a");
// 订阅topic_b tag为A的消息
$consumer->subscribe("topic_b", "A");
$consumer->open();
// 拉取消息
$messages = $consumer->consumeMessage();
$acksHandles = [];
foreach ($messages as $msg) {
$body = $msg->body;
echo "message bode: $body \n";
array_push($acksHandles, $msg->msgHandle);
}
// 确认消息的消费情况
// ackMessages第一个参数是确认消费成功的消息的msgHandle
// ackMessages第二个参数是确认消费失败的消息的msgHandle
$consumer->ackMessages($acksHandles, []);
$consumer->close();
$producer->open();
// 在open后等待60秒
sleep(60);
// 下面的方法调用会失败,因为服务端的生产者实例已超时被销毁掉
$producer->publishMessage($msg);
use RMQ\Exception\MQTokenTimeoutException;
for ($i = 0; $i < 10; $i++) {
if ($i % 2 == 0) {
sleep(60 * 10); // 偶数消息等待10分钟
}
$message = new Message("topic_name", "hello!");
try {
// 发送消息
$producer->publishMessage($message);
} catch (MQTokenTimeoutException $e) {
// token失效的情况需要重连
$producer->open();
// 对消息重发
$producer->publishMessage($message);
} catch (RuntimeException $e) {
// 其他错误情况
echo $e . "\n";
}
}
$consumer->open();
while (true) {
try {
// 拉取消息
$messages = $consumer->consumeMessage();
$acksHandles = [];
foreach ($messages as $msg) {
$body = $msg->body;
echo "message bode: $body \n";
array_push($acksHandles, $msg->msgHandle);
}
// 确认消费状态
$consumer->ackMessages($acksHandles, []);
} catch (MQTokenTimeoutException $e) {
// token失效的情况需要重连
$consumer->open();
} catch (RuntimeException $e) {
// 其他错误
echo $e;
}
}
use RMQ\Message;
$msg = new Message("topic_name", "content.");
// 消息投递的具体毫秒时间戳(当前时间延迟30秒)
$postTime = time() * 1000 + 30000;
// 将延时属性设置到Property中
$msg->putProperty("__STARTDELIVERTIME", "$currentTimeStamp");
$msg2 = new Message("topic_name", "content");
$msg2->setDelayLevel(5)
$level = 5
$msg2 = new Message("topic_name", "content");
$msg2->putProperty("__DelayTimeLevel", "$level");