Download the PHP package mustafa3264/messagebus without Composer
On this page you can find all versions of the php package mustafa3264/messagebus. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download mustafa3264/messagebus
More information about mustafa3264/messagebus
Files in mustafa3264/messagebus
Package messagebus
Short Description messagebus include kafka and redis stream for hyperf
License MIT
Informations about the package messagebus
swoole hyperf框架 消息总线
支持kafka
和redis stream
队列,生产者往消息总线中放入事件和相关数据
按照功能模块创建消费者进程,每个模块可以有多个消费者订阅多个事件,返回具体的消费者,订阅获得的事件以及与该事件相关的参数,供业务方逻辑处理
为生产者和消费者建立了连接池,支持消息的批量发送和批量接收
基于 hyperf/metric
实现对消息总线的监控,主要监控消息的延迟程度以及消息被用来处理业务逻辑耗费的时间,指标为 messagebus_job_cost
和 messagebus_job_delay
,可上报到prometheus等用于监控和告警
for example:
直播业务包含主播获得收入事件和直播开播时长事件,他们被放入消息总线中 任务考核模块有每天直播大于60分钟且累计收入大于100元 —— 这个任务本身可以看作一个消费者,消费上述两个事件进行业务逻辑的处理 该sdk会返回任务标识,直播时长事件或者收入事件的相关数据供业务方处理
使用向导
安装
composer require mustafa3264/messagebus
php bin/hyperf.php vendor:publish mustafa3264/messagebus
配置
default 部分属于公共配置,规定了消息总线是使用kafka还是redis stream
如果是kafka,如果是kafka,pool定义了 生产者
的连接池配置,host是kafka的broker地址;如果是redis,sdk使用hyperf自己实现的redis连接池,pool规定具体使用哪个redis连接池
test 部分是具体的业务模块,kafka部分定义了该消费者部分使用的连接池,consume_num
是从消息总线单次取出来的消息数,consumer_interval
是获取消息的频率
创建模块消费者进程
modelName
是消费者进程所属模块,与上面config的模块配置要求保持一致
name
消费者进程名称
nums
消费进程的数量
getConsumersWithTopicsAndEvents
函数,返回的是消费者订阅的队列和事件列表,这个逻辑根据具体业务实现,可以从配置中心获取,也可以从数据库查询生成
beforeMessageBus
该函数在消费者循环启动之前执行,用于初始化,在这里,如果使用redis队列,可以初始化消费组
handleMessage
可以拿到消费者,与之关联的事件以及该事件的参数数据,根据业务逻辑进行处理,如校验数据是否符合任务要求,更新任务完成进程,判断任务是否完成等逻辑
配置消费者进程
在process.php文件中添加该消费者进程
联系作者
我的邮箱
微信
fanghailiang2023
All versions of messagebus with dependencies
longlang/phpkafka Version ^1.2
hyperf/redis Version *
hyperf/process Version *
hyperf/metric Version *