Download the PHP package solarseahorse/webman-redis-queue without Composer
On this page you can find all versions of the php package solarseahorse/webman-redis-queue. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Download solarseahorse/webman-redis-queue
More information about solarseahorse/webman-redis-queue
Files in solarseahorse/webman-redis-queue
Package webman-redis-queue
Short Description A webman redis queue plug-in
License MIT
Informations about the package webman-redis-queue
Webman Redis Queue 插件
简介
webman-redis-queue
是为 Webman 框架设计的高效、灵活的 Redis
队列插件。利用 Redis Stream 的强大特性,该插件专注于提供可靠和高性能的消息队列解决方案,适合处理大规模的数据流和复杂的队列操作。
主要特性
- 基于 Redis Stream: 使用 Redis 最新的 Stream 数据类型,为消息队列和事件流提供优化的存储和访问。
- 自定义异常重试: 支持自定义的消息处理失败重试机制,提高消息处理的可靠性。
- 死信队列处理: 集成死信队列管理,确保消息不会因处理失败而丢失。
- 延时队列支持: 实现延时消息处理,使得定时任务和延迟执行变得简单易行。
- 高效的异常处理机制: 强化的异常处理策略,确保队列的稳定运行。
安装
通过 Composer 安装 webman-redis-queue
:
版本变更记录
v1.0.1 (20240128)
新增功能
-
删除延时消息: 新增
removeDelayedMessage
方法,允许移除一条延时消息。 -
批量删除延时消息: 新增
removeDelayedMessages
方法,允许一次性移除多个指定的延时消息。 -
检查延时消息存在性: 新增
hasDelayedMessageExists
方法,用于检查延时消息是否存在。 - 批量检查延时消息存在性:
新增
hasDelayedMessagesExist
方法,用于批量检查多个延时消息是否存在。
异常处理
- 引入新的异常类型:
为延时消息的移除和存在性检查操作引入了
DelayedMessageRemoveException
和DelayedMessageCheckException
异常类型。
文档修正
- 修正文档中的几处错误: 对插件的官方文档进行了更新,修正了之前版本中存在的一些描述不准确和排版错误。
测试和反馈
我们非常欢迎并鼓励您在测试环境中尝试这个插件,并且分享您的使用体验。您的反馈对我们改进插件、修复潜在的问题以及发布未来的稳定版本非常重要。如果您在使用过程中遇到任何问题或有任何建议,请通过 GitHub Issues 与我联系。
参与贡献
如果您对改进 webman-redis-queue 有兴趣,欢迎任何形式的贡献,包括但不限于:提交问题、提供反馈、或直接向代码库提交改进。您的贡献将帮助我们更快地推出稳定、功能丰富的正式版本。
配置
配置文件自动生成在 config/plugin/solarseahorse/webman-redis-queue目录下。
1. Redis配置 redis.php
在webman集群下,每个节点需要连接同一个redis。
断线重连
注意:开启此选项能增加队列运行稳定性,但如果队列进程过多,redis恢复后可能造成突发大量连接数,因为每个进程都有一个redis连接。
默认开启,当Redis发生重载
,重启
等情况会尝试重连,超过最大重试次数后会报错并重启进程(webman默认行为)。
2. 日志配置 log.php
推荐为插件配置单独日志通道,参考链接 webman日志
在队列消费业务逻辑中可以这样使用日志,使用方法和官方的Log
类使用方法一致。
4. 队列配置 process.php
- 在加载类名模式下,每个队列都拥有独立的运行进程。
- 每个队列的配置和数据存储KEY都是独立的。
- 不推荐目录模式是因为多个队列共享进程,其中某个队列出现异常可能影响到其他队列。
- 队列的详细配置都在消费类中配置,配置文件只是基本的进程配置。
定义消费类
插件对消费类对位置没有固定要求,符合加载规范即可。
教程以app/queue/SendEmail.php
举例,目录和文件需自行创建。
继承 SolarSeahorse\WebmanRedisQueue\Consumer
,配置连接标识,并实现抽象方法consume
, 一个最基础的消费类就创建好了。
编写完成后需要在队列配置文件
process.php
中新增队列配置。
通过命令行创建
通过 php webman solar:make:consumer
命令可快速创建一个消费类。
示例操作:
最终将会在 app/queue/test
目录中创建 SendCode.php
文件。
队列配置文件process.php
也会自动更新。
配置属性
protected string $connection = 'default';
- 连接标识,用于指定 Redis 连接配置。
protected string $queueName = '';
- 队列名称,默认自动生成。
protected string $groupName = '';
- 队列分组名,默认自动生成。
protected string $streamKey = '';
- Stream key,默认自动生成。
protected int $prefetchCount = 1;
- 返回消息的最大数量。默认为1 不建议修改
- 消费速度可通过提高进程数并行处理消息,消费者每次读取多条数据是循环消费,极端情况如循环消费一半进程重启会造成大量消息挂起。
protected int $blockTime = 5000;
- 当无消息时堵塞等待的毫秒数,也可作为无消息时的休眠时长。如果队列以延时队列为主,应与延时队列间隔相近。
protected float $consumerTimerInterval = 0.5;
- 消费者处理间隔,消费完一条消息后的等待时间(秒)。
protected int $maxAttempts = 5;
- 消费失败后的最大重试次数。
protected int $retrySeconds = 60;
- 重试间隔(秒)。
protected bool $autoAck = true;
- 是否自动确认消息。开启的同时同样建议在业务逻辑中显式调用
ack
方法。
protected bool $autoDel = true;
- 是否自动删除已确认成功的消息。
protected int $delayedQueueOnceHandlerCount = 128;
- 延时队列每次处理数量,根据生产速率适当配置。
protected int $delayedMessagesTimerInterval = 1;
- 延时消息处理间隔(秒)。
protected int $delayedMessagesMaxWorkerCount = 1;
- 延时队列最大进程数,默认单线程,只会在一个进程开启延时队列处理。
protected string $delayedTaskSetKey = '';
- 延时队列 SET KEY,默认自动生成。
protected string $delayedDataHashKey = '';
- 延时队列 HASH KEY,默认自动生成。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;
- 消息挂起超时处理策略。
PENDING_PROCESSING_RETRY
或PENDING_PROCESSING_IGNORE
。 PENDING_PROCESSING_RETRY
当消息挂起超时会进行异常重试。PENDING_PROCESSING_IGNORE
当消息挂起超时时,触发死信处理
方便排查错误,除此之外只清理pending
列表,不做其他处理。- 默认
PENDING_PROCESSING_RETRY
, 根据队列场景选择合适的处理策略,比如发送短信验证码
,当系统出现了崩溃等情况,恢复上线时,一般情况下这类消息时不需要恢复,此时重新给用户发送验证码没有意义,但因为Redis Stream
特性,未ack的消息会在pending
列表中不会丢失,这类场景就适合配置PENDING_PROCESSING_IGNORE
protected int $pendingTimout = 300;
- 消息挂起超时时间(秒)。
- 在Redis Stream中当消息被消费者读取,但没有确认(ACK)时,消息会处于挂起状态进入
pending
列表。 - 如果消息处理缓慢,此值应尽可能调大,避免将正常处理的消息当成超时处理掉。
protected int $checkPendingTimerInterval = 60;
- 检查 pending 列表的间隔时间(秒)。
protected int $onceCheckPendingCount = 50;
- 每次检查 pending 列表的消息数量。
投递消息
通过pushMessage
方法可快速向队列投递一条消息。
有时候我们需要一次投递大量队列时,可以通过pushMessages
方法,批量投递消息,此方法会开启Redis
的pipeline
管道投递,提高与redis
的交互性能。
数组投递实际是通过数组创建一个
QueueMessage
对象
延时消息
延时消息的作用:
-
定时任务: 延时消息可以用来实现定时任务。例如,你可能想在未来的某个时间点执行特定操作,如发送提醒、更新状态等。
-
延迟处理: 在某些情况下,立即处理消息并不理想或可能。延时消息允许应用程序延迟处理,直到最合适的时机。
-
限流: 延时消息可以帮助对系统内部的请求进行限流,防止在短时间内因大量请求而过载。
- 解耦和异步处理: 在复杂的系统中,延时消息可以用来解耦不同组件间的直接交互,提高系统的可扩展性和维护性。
通过 scheduleDelayedMessage
方法快速投递一条延时消息。
如果我们想避免消息被重复发送等情况,通过延时队列的特性可以很简单实现。通过scheduleDelayedMessage
方法的第三个参数identifier
传递一个自定义的延时消息ID,同样的消息ID,消息将会被替换,延时时间从修改开始重新计算。
如果消息已经进入stream队列将无法实现替换,必须在延时时间内,类似实现一个“防抖”效果,消息在时间段内发送多次最终只处理一次。
当一次需要投递大量延时消息时,可以通过scheduleDelayedMessages
方法发送。
多redis只需要在队列配置
connection
连接标识,投递方式没有任何变化。
移除延时队列消息
新功能 (v1.0.1)
以下功能在插件的
v1.0.1
版本中新增。
有时候我们想移除某个或多个延时队列时,可以使用removeDelayedMessage
和removeDelayedMessages
方法实现,使用hasDelayedMessageExists
和hasDelayedMessagesExist
判断一条或多条延时消息是否存在。
只有任务还存在延时队列中才能移除,如果已经进入
Stream
队列中将无法移除。
代码示例:
消费消息
消费消息时会调用消费类的consume
方法,并传递一个实现ConsumerMessageInterface
接口对象。
上方示例主要演示可调用的方法,下面使用一个更加贴合实际的demo,更快了解消费业务逻辑的编写。
发送邮件验证码
场景特点:获取验证码的操作一般由用户手动触发,在这类场景中,错误重试应用户在前端UI倒计时结束后重新手动发起,如果业务出现崩溃,再次上线后重新发送验证码给用户已经没有意义了。我们可以通过配置适应这类场景,代码示例:
自定义错误重试
消费类继承的抽象类Consumer
默认实现了handlerFailRetry
方法,在触发异常重试时,会调用此方法,如果您想自定义错误重试逻辑,或加入更多自定义的处理,在本插件中可以轻松实现,并且每个队列都支持自定义配置。
默认实现的代码如上,我们只需要重写此方法就可以自定义错误处理的业务逻辑。
代码示例:
自定义死信处理
在handlerFailRetry
方法中,默认有这一段:
那么,我们如果需要自定义死信处理或加入额外的业务逻辑可以通过重写handlerDeadLetterQueue
方法实现。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
当我们设置pending处理策略为PENDING_PROCESSING_IGNORE
时,消息如果挂起超时,将不会触发异常重试,而是直接调用死信处理。默认情况下,死信处理会新增一条日志,方便排查问题。
默认情况下需要配置有效的日志(log.php) 默认行为才有效。也可以通过重写方法完全自行实现,记录在业务的数据库中,这也是推荐的做法,可以针对业务实现更加灵活的异常处理。
代码示例:
自定义pending超时处理
抽象类Consumer
中,默认定义了handlerPendingTimeoutMessages
方法,用于处理pending超时的消息。
消费者读取了一条消息后,消息会进入pending
列表,不会被当前和其他消费者再次读取,当业务逻辑没有执行完毕,服务出现掉线,崩溃时,消息并没有ack
,消息会一直保存在pending
列表中,pending
列表只能通过ack
移除,如果长期不处理,可能造成pending
列表堆积,造成大量内存占用,当持续时间大于$pendingTimout
属性的时间(默认300秒),会调用此方法进行处理。
默认情况下,在
PENDING_PROCESSING_IGNORE
策略中,我们认为pending超时消息是死信,不会再次处理,PENDING_PROCESSING_RETRY
会进行异常重试。
获取队列的redis连接
有时候我们需要操作或维护队列时,可以直接获取队列的Redis连接进行操作,比如编写自定义脚本等。
命令行
php webman solar:make:consumer
- 创建一个消费者
- 它将引导你创建一个基本的消费者类
php webman solar:remove:consumer
- 移除一个消费者
- 它将引导你移除消费者类
- 注意:它会移除redis中关于此消费者的所有数据,如果你只是想移除类和配置,请不要使用此命令。
php webman solar:clean:redis:data
- 清理某个消费者的Redis数据
- 它将引导你清理redis数据
- 注意:它将删除redis中关于此消费者的所有数据,请谨慎操作。
php webman solar:consumer:list
- 获取当前全部消费者信息,包含如下信息:
Key
标识Handler
进程类Count
进程数Consumer
消费者类名Stream Length
当前队列总长度(不包含Pending列表中的数量)Delay Set Length
当前延时队列任务数Pending List Length
当前Pending列表长度
处理历史消息
使用场景:
- 在极端情况下业务执行完毕并且ack成功,但是删除消息时出现异常,消息保留在
stream
中,一般少量数据时我们无需在意,但如果堆积数量过大可能造成内存占用和性能问题。 - 当你需要处理历史消息,或者重新处理之前已经处理过的消息。
- 当你需要对 Stream 中的历史数据进行分析或生成报告。
当
autoDel
属性为true
时,消息会自动删除,无法对历史数据进行处理和分析,如果业务需要对历史队列消息进行回溯请设置为false
代码示例:
这里我们使用了webman
中自定义脚本
的编写,可以将脚本加入定时任务中,定期清理或处理历史消息。
下方代码只是示例,请确保在测试环境充分测试。
在其他项目投递消息
目前插件没有实现在其他项目投递的标准实现,可通过业务需求开发队列提交接口实现。