PHP code example of huangbin2018 / dbmq
1. Go to this page and download the library: Download huangbin2018/dbmq library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
huangbin2018 / dbmq example snippets
// Mysql 连接配置
$dbConfig = [
'user' => 'root',
'password' => '',
'host' => '127.0.0.1',
'port' => '3306',
'database' => 'test',
];
$channel = 'test_channel';
$tag = 'test_tag';
$consumerArr = [
'consumer_1',
'consumer_2',
'consumer_3',
'consumer_4',
'consumer_5',
] ;
// 实例化消息生产者
$publisherObj = new Publisher($channel, $dbConfig);
// 定义tag
$publisherObj->declareTag($tag, TagType::TOPIC, '测试tag');
// 定义消费者
foreach ($consumerArr as $consumerKey) {
// 定义消费者
$publisherObj->declareConsumer($consumerKey, '', '测试consumerKey');
// 绑定tag
$publisherObj->bindTag($consumerKey, $tag);
}
// 发布消息
$key = 'uid_1';
$data = [
'user_id' => 1,
'user_name' => 'huangbin',
];
$message = new Message($data);
$body = $message->serialize();
$rs = $publisherObj->send($tag, $key, $body);
use DBMQ\Message\Message;
use DBMQ\Consumer\Consumer;
use DBMQ\Message\Response;
// 数据库连接参数
$dbConfig = [
'user' => 'root',
'password' => '',
'host' => '127.0.0.1',
'port' => '3306',
'database' => 'test',
];
$channel = 'test_channel';
$tag = 'test_tag';
$consumerKey = 'consumer_1';
$processSize = $argv[1] ?? 0; // 消费者进程数
$processIndex = $argv[2] ?? 0; // 进程索引
$consumerObj = new Consumer($consumerKey, $dbConfig, '', $processSize, $processIndex);
// 定义消息消费处理函数
$consumerObj->run(function (Message $message) {
$timestamp = $message->getTimestamp();
// 消息体
$body = $message->getMessage();
print_r($body);
// 这里是逻辑处理...
try {
$ack = true;
if ($ack == false) {
$msg = '测试失败啦';
return Response::isFail($msg);
} else {
$msg = '测试成功啦';
return Response::isSuccess($msg);
}
} catch (\Exception $e) {
// 异常
return Response::isException($e->getMessage());
}
});