Download the PHP package solarseahorse/webman-redis-queue without Composer

On this page you can find all versions of the php package solarseahorse/webman-redis-queue. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.

FAQ

After the download, you have to make one include require_once('vendor/autoload.php');. After that you have to import the classes with use statements.

Example:
If you use only one package a project is not needed. But if you use more then one package, without a project it is not possible to import the classes with use statements.

In general, it is recommended to use always a project to download your libraries. In an application normally there is more than one library needed.
Some PHP packages are not free to download and because of that hosted in private repositories. In this case some credentials are needed to access such packages. Please use the auth.json textarea to insert credentials, if a package is coming from a private repository. You can look here for more information.

  • Some hosting areas are not accessible by a terminal or SSH. Then it is not possible to use Composer.
  • To use Composer is sometimes complicated. Especially for beginners.
  • Composer needs much resources. Sometimes they are not available on a simple webspace.
  • If you are using private repositories you don't need to share your credentials. You can set up everything on our site and then you provide a simple download link to your team member.
  • Simplify your Composer build process. Use our own command line tool to download the vendor folder as binary. This makes your build process faster and you don't need to expose your credentials for private repositories.
Please rate this library. Is it a good library?

Informations about the package webman-redis-queue

Webman Redis Queue 插件

简介

webman-redis-queue 是为 Webman 框架设计的高效、灵活的 Redis 队列插件。利用 Redis Stream 的强大特性,该插件专注于提供可靠和高性能的消息队列解决方案,适合处理大规模的数据流和复杂的队列操作。

主要特性

安装

通过 Composer 安装 webman-redis-queue

版本变更记录

v1.0.1 (20240128)

新增功能

异常处理

文档修正

测试和反馈

我们非常欢迎并鼓励您在测试环境中尝试这个插件,并且分享您的使用体验。您的反馈对我们改进插件、修复潜在的问题以及发布未来的稳定版本非常重要。如果您在使用过程中遇到任何问题或有任何建议,请通过 GitHub Issues 与我联系。

参与贡献

如果您对改进 webman-redis-queue 有兴趣,欢迎任何形式的贡献,包括但不限于:提交问题、提供反馈、或直接向代码库提交改进。您的贡献将帮助我们更快地推出稳定、功能丰富的正式版本。

配置

配置文件自动生成在 config/plugin/solarseahorse/webman-redis-queue目录下。

1. Redis配置 redis.php

在webman集群下,每个节点需要连接同一个redis。

断线重连

注意:开启此选项能增加队列运行稳定性,但如果队列进程过多,redis恢复后可能造成突发大量连接数,因为每个进程都有一个redis连接。

默认开启,当Redis发生重载,重启等情况会尝试重连,超过最大重试次数后会报错并重启进程(webman默认行为)。

2. 日志配置 log.php

推荐为插件配置单独日志通道,参考链接 webman日志

在队列消费业务逻辑中可以这样使用日志,使用方法和官方的Log类使用方法一致。

4. 队列配置 process.php

  1. 在加载类名模式下,每个队列都拥有独立的运行进程。
  2. 每个队列的配置和数据存储KEY都是独立的。
  3. 不推荐目录模式是因为多个队列共享进程,其中某个队列出现异常可能影响到其他队列。
  4. 队列的详细配置都在消费类中配置,配置文件只是基本的进程配置。

定义消费类

插件对消费类对位置没有固定要求,符合加载规范即可。

教程以app/queue/SendEmail.php举例,目录和文件需自行创建。

继承 SolarSeahorse\WebmanRedisQueue\Consumer,配置连接标识,并实现抽象方法consume, 一个最基础的消费类就创建好了。

编写完成后需要在队列配置文件process.php中新增队列配置。

通过命令行创建

通过 php webman solar:make:consumer 命令可快速创建一个消费类。

示例操作:

最终将会在 app/queue/test 目录中创建 SendCode.php 文件。

队列配置文件process.php也会自动更新。

配置属性

protected string $connection = 'default';

protected string $queueName = '';

protected string $groupName = '';

protected string $streamKey = '';

protected int $prefetchCount = 1;

protected int $blockTime = 5000;

protected float $consumerTimerInterval = 0.5;

protected int $maxAttempts = 5;

protected int $retrySeconds = 60;

protected bool $autoAck = true;

protected bool $autoDel = true;

protected int $delayedQueueOnceHandlerCount = 128;

protected int $delayedMessagesTimerInterval = 1;

protected int $delayedMessagesMaxWorkerCount = 1;

protected string $delayedTaskSetKey = '';

protected string $delayedDataHashKey = '';

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;

protected int $pendingTimout = 300;

protected int $checkPendingTimerInterval = 60;

protected int $onceCheckPendingCount = 50;

投递消息

通过pushMessage方法可快速向队列投递一条消息。

有时候我们需要一次投递大量队列时,可以通过pushMessages方法,批量投递消息,此方法会开启Redispipeline 管道投递,提高与redis的交互性能。

数组投递实际是通过数组创建一个QueueMessage对象

延时消息

延时消息的作用:

  1. 定时任务: 延时消息可以用来实现定时任务。例如,你可能想在未来的某个时间点执行特定操作,如发送提醒、更新状态等。

  2. 延迟处理: 在某些情况下,立即处理消息并不理想或可能。延时消息允许应用程序延迟处理,直到最合适的时机。

  3. 限流: 延时消息可以帮助对系统内部的请求进行限流,防止在短时间内因大量请求而过载。

  4. 解耦和异步处理: 在复杂的系统中,延时消息可以用来解耦不同组件间的直接交互,提高系统的可扩展性和维护性。

通过 scheduleDelayedMessage 方法快速投递一条延时消息。

如果我们想避免消息被重复发送等情况,通过延时队列的特性可以很简单实现。通过scheduleDelayedMessage 方法的第三个参数identifier传递一个自定义的延时消息ID,同样的消息ID,消息将会被替换,延时时间从修改开始重新计算。

如果消息已经进入stream队列将无法实现替换,必须在延时时间内,类似实现一个“防抖”效果,消息在时间段内发送多次最终只处理一次。

当一次需要投递大量延时消息时,可以通过scheduleDelayedMessages方法发送。

多redis只需要在队列配置connection连接标识,投递方式没有任何变化。

移除延时队列消息

新功能 (v1.0.1)

以下功能在插件的 v1.0.1 版本中新增。

有时候我们想移除某个或多个延时队列时,可以使用removeDelayedMessageremoveDelayedMessages 方法实现,使用hasDelayedMessageExistshasDelayedMessagesExist判断一条或多条延时消息是否存在。

只有任务还存在延时队列中才能移除,如果已经进入Stream队列中将无法移除。

代码示例:

消费消息

消费消息时会调用消费类的consume方法,并传递一个实现ConsumerMessageInterface接口对象。

上方示例主要演示可调用的方法,下面使用一个更加贴合实际的demo,更快了解消费业务逻辑的编写。

发送邮件验证码

场景特点:获取验证码的操作一般由用户手动触发,在这类场景中,错误重试应用户在前端UI倒计时结束后重新手动发起,如果业务出现崩溃,再次上线后重新发送验证码给用户已经没有意义了。我们可以通过配置适应这类场景,代码示例:

自定义错误重试

消费类继承的抽象类Consumer默认实现了handlerFailRetry 方法,在触发异常重试时,会调用此方法,如果您想自定义错误重试逻辑,或加入更多自定义的处理,在本插件中可以轻松实现,并且每个队列都支持自定义配置。

默认实现的代码如上,我们只需要重写此方法就可以自定义错误处理的业务逻辑。

代码示例:

自定义死信处理

handlerFailRetry方法中,默认有这一段:

那么,我们如果需要自定义死信处理或加入额外的业务逻辑可以通过重写handlerDeadLetterQueue方法实现。

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE; 当我们设置pending处理策略为PENDING_PROCESSING_IGNORE 时,消息如果挂起超时,将不会触发异常重试,而是直接调用死信处理。默认情况下,死信处理会新增一条日志,方便排查问题。

默认情况下需要配置有效的日志(log.php) 默认行为才有效。也可以通过重写方法完全自行实现,记录在业务的数据库中,这也是推荐的做法,可以针对业务实现更加灵活的异常处理。

代码示例:

自定义pending超时处理

抽象类Consumer中,默认定义了handlerPendingTimeoutMessages方法,用于处理pending超时的消息。

消费者读取了一条消息后,消息会进入pending 列表,不会被当前和其他消费者再次读取,当业务逻辑没有执行完毕,服务出现掉线,崩溃时,消息并没有ack,消息会一直保存在pending 列表中,pending列表只能通过ack移除,如果长期不处理,可能造成pending 列表堆积,造成大量内存占用,当持续时间大于$pendingTimout属性的时间(默认300秒),会调用此方法进行处理。

默认情况下,在PENDING_PROCESSING_IGNORE策略中,我们认为pending超时消息是死信,不会再次处理,PENDING_PROCESSING_RETRY 会进行异常重试。

获取队列的redis连接

有时候我们需要操作或维护队列时,可以直接获取队列的Redis连接进行操作,比如编写自定义脚本等。

命令行

php webman solar:make:consumer

php webman solar:remove:consumer

php webman solar:clean:redis:data

php webman solar:consumer:list

处理历史消息

使用场景:

  1. 在极端情况下业务执行完毕并且ack成功,但是删除消息时出现异常,消息保留在stream中,一般少量数据时我们无需在意,但如果堆积数量过大可能造成内存占用和性能问题。
  2. 当你需要处理历史消息,或者重新处理之前已经处理过的消息。
  3. 当你需要对 Stream 中的历史数据进行分析或生成报告。

autoDel属性为true 时,消息会自动删除,无法对历史数据进行处理和分析,如果业务需要对历史队列消息进行回溯请设置为false

代码示例:

这里我们使用了webman自定义脚本 的编写,可以将脚本加入定时任务中,定期清理或处理历史消息。

下方代码只是示例,请确保在测试环境充分测试。

在其他项目投递消息

目前插件没有实现在其他项目投递的标准实现,可通过业务需求开发队列提交接口实现。


All versions of webman-redis-queue with dependencies

PHP Build Version
Package Version
Requires php Version >=8.0
ext-redis Version *
ext-pdo Version *
webman/console Version ^1.3
Composer command for our command line client (download client) This client runs in each environment. You don't need a specific PHP version etc. The first 20 API calls are free. Standard composer command

The package solarseahorse/webman-redis-queue contains the following files

Loading the files please wait ....