PHP code example of mustafa3264 / messagebus

1. Go to this page and download the library: Download mustafa3264/messagebus library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

mustafa3264 / messagebus example snippets




declare(strict_types=1);

use Mustafa\Messagebus;

return [
    'test' => [// 业务板块
        'drivers' => [ // 消费者驱动配置
            MessageBus\Constants\MessageBusType::KAFKA => [// kafka
                'pool' => [ // 连接池配置
                    'min_connections' => 1,
                    'max_connections' => 1,
                    'connect_timeout' => 1,
                    'wait_timeout' => 1,
                    'heartbeat' => -1,
                    'max_idle_time' => 60,
                ],
                'consume_num' => 1, // 单次消费消息的数量
            ],
            MessageBus\Constants\MessageBusType::REDIS => [// redis stream
                'consume_num' => 10, // 单次消费消息的数量
            ],
        ],
        'consumer_interval' => 2, // 每多长时间消费一次
    ],
    'default' => [ // 全局配置
        'default_driver' => MessageBus\Constants\MessageBusType::REDIS, // 默认驱动
        'drivers' => [ // 生产者驱动配置
            MessageBus\Constants\MessageBusType::KAFKA => [// kafka
                'pool' => [ // 连接池配置
                    'min_connections' => 1,
                    'max_connections' => 1,
                    'connect_timeout' => 1,
                    'wait_timeout' => 1,
                    'heartbeat' => -1,
                    'max_idle_time' => 60,
                ],
                'host' => env('KAFKA_HOST', 'localhost:9092'),
            ],
            MessageBus\Constants\MessageBusType::REDIS => [// redis stream
                'pool' => 'default',
            ],
        ],
    ],
];



namespace App\Process;

use Hyperf\Redis\RedisFactory;
use Mustafa\Messagebus\Constants\MessageBusType;
use Mustafa\Messagebus\Consumer\AbstractProcess;

class MessagebusConsumerProcess extends AbstractProcess
{
    public string $modelName = 'test'; // test 模块的消息队列

    public string $name = 'test-eventbus-queme'; // 进程名称

    public int $nums = 1; // 进程数量

    protected int $restartInterval = 120; // 进程重启间隔

    protected function getConsumersWithTopicsAndEvents(): array
    {
        return [ // 模块下的消费者列表 对应消费者订阅的主题(队列)
            'ruby-live25' => [ // 消费者消费的topic
                'topics' => [
                    \App\Constants\KafkaTopics::TEST_TOPIC1,
                ],
                'events' => [ // 消费者关注的事件
                    \App\Constants\MessageBusEvents::EVENTS1,
                ],
            ],
        ];
    }

    protected function handleMessage(string $consumer, string $event, array $payload): void
    {
        print_r($consumer);
        echo PHP_EOL;
        print_r($event);
        echo PHP_EOL;
        print_r($payload);
        echo PHP_EOL;
        echo PHP_EOL;
    }

    protected function beforeMessageBus()
    {
        if ($this->defaultDriver === MessageBusType::REDIS) {
            // 初始化消费组
            $redis = $this->container->get(RedisFactory::class)->get(
                $this->config->get('messagebus.default.drivers.' . $this->defaultDriver . '.pool')
            );
            foreach ($this->getConsumersWithTopicsAndEvents() as $group => $consumer_row) {
                $topics = $consumer_row['topics'];
                foreach ($topics as $_topic) {
                    $redis->rawCommand('xgroup', 'create', $_topic, $this->modelName . '-' . $group, 0);
                }
            }
        }
    }

}



declare(strict_types=1);

return [
    App\Process\MessagebusConsumerProcess::class,
];