PHP code example of mustafa3264 / messagebus
1. Go to this page and download the library: Download mustafa3264/messagebus library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
mustafa3264 / messagebus example snippets
declare(strict_types=1);
use Mustafa\Messagebus;
return [
'test' => [// 业务板块
'drivers' => [ // 消费者驱动配置
MessageBus\Constants\MessageBusType::KAFKA => [// kafka
'pool' => [ // 连接池配置
'min_connections' => 1,
'max_connections' => 1,
'connect_timeout' => 1,
'wait_timeout' => 1,
'heartbeat' => -1,
'max_idle_time' => 60,
],
'consume_num' => 1, // 单次消费消息的数量
],
MessageBus\Constants\MessageBusType::REDIS => [// redis stream
'consume_num' => 10, // 单次消费消息的数量
],
],
'consumer_interval' => 2, // 每多长时间消费一次
],
'default' => [ // 全局配置
'default_driver' => MessageBus\Constants\MessageBusType::REDIS, // 默认驱动
'drivers' => [ // 生产者驱动配置
MessageBus\Constants\MessageBusType::KAFKA => [// kafka
'pool' => [ // 连接池配置
'min_connections' => 1,
'max_connections' => 1,
'connect_timeout' => 1,
'wait_timeout' => 1,
'heartbeat' => -1,
'max_idle_time' => 60,
],
'host' => env('KAFKA_HOST', 'localhost:9092'),
],
MessageBus\Constants\MessageBusType::REDIS => [// redis stream
'pool' => 'default',
],
],
],
];
namespace App\Process;
use Hyperf\Redis\RedisFactory;
use Mustafa\Messagebus\Constants\MessageBusType;
use Mustafa\Messagebus\Consumer\AbstractProcess;
class MessagebusConsumerProcess extends AbstractProcess
{
public string $modelName = 'test'; // test 模块的消息队列
public string $name = 'test-eventbus-queme'; // 进程名称
public int $nums = 1; // 进程数量
protected int $restartInterval = 120; // 进程重启间隔
protected function getConsumersWithTopicsAndEvents(): array
{
return [ // 模块下的消费者列表 对应消费者订阅的主题(队列)
'ruby-live25' => [ // 消费者消费的topic
'topics' => [
\App\Constants\KafkaTopics::TEST_TOPIC1,
],
'events' => [ // 消费者关注的事件
\App\Constants\MessageBusEvents::EVENTS1,
],
],
];
}
protected function handleMessage(string $consumer, string $event, array $payload): void
{
print_r($consumer);
echo PHP_EOL;
print_r($event);
echo PHP_EOL;
print_r($payload);
echo PHP_EOL;
echo PHP_EOL;
}
protected function beforeMessageBus()
{
if ($this->defaultDriver === MessageBusType::REDIS) {
// 初始化消费组
$redis = $this->container->get(RedisFactory::class)->get(
$this->config->get('messagebus.default.drivers.' . $this->defaultDriver . '.pool')
);
foreach ($this->getConsumersWithTopicsAndEvents() as $group => $consumer_row) {
$topics = $consumer_row['topics'];
foreach ($topics as $_topic) {
$redis->rawCommand('xgroup', 'create', $_topic, $this->modelName . '-' . $group, 0);
}
}
}
}
}
declare(strict_types=1);
return [
App\Process\MessagebusConsumerProcess::class,
];