PHP code example of solarseahorse / webman-redis-queue
1. Go to this page and download the library: Download solarseahorse/webman-redis-queue library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
solarseahorse / webman-redis-queue example snippets
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密码,字符串类型,可选参数
'db' => 0, // 数据库
'prefix' => 'webman_redis_queue_', // key 前缀
'timeout' => 2, // Timeout
'ping' => 55, // Ping
'reconnect' => true, // 断线重连
'max_retries' => 5, // 最大重连次数
'retry_interval' => 5 , // 重连间隔 s
]
],
];
return [
'enable' => true, // 启用日志
'handlers' => support\Log::channel('default') // 默认通道 default
];
LogUtility::warning('Error:', [
'data' => $consumerMessage->getData(),
'errorMessage' => $e->getMessage()
]);
return [
'send-email' => [
'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
'count' => 20, // 在目录模式中,目录下所有队列是共用进程
'constructor' => [
// 支持目录和类 推荐使用类名
'consumer_source' => \App\queue\test\Email::class
]
]
];
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
}
}
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendCode extends Consumer
{
// 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
// 消费
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
var_dump($messageId);
}
}
return array (
'sendCode' =>
array (
'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
'count' => 1,
'constructor' =>
array (
'consumer_source' => 'app\\queue\\test\\SendCode',
),
),
);
/**
* @param mixed|QueueMessageInterface $data
* @return string|bool
* @throws QueueMessagePushException
*/
public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;
// 消息内容,无需序列化
$message = [
'dummy' => 'ok'
];
// 生产者工厂方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
->pushMessage($message);
// 通过消费类工厂方法 创建一个生产者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
// 投递QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 或者通过QueueMessageFactory创建一条消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);
// 修改队列数据
$message->setData(['dummy' => 'no']);
// 设置错误次数
$message->setFailCount(3);
// 通过上方两种方法投递均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
var_export($messageId); // 返回stream的字符串ID 或 false
/**
* @param array|QueueMessageInterface[] $dataArray
* @return array|false
* @throws QueueMessagePushException
*/
public function pushMessages(array $dataArray): array|bool;
// 投递5w条消息
$dataArr = array_fill(0, 50000, null);
for ($i = 0; $i < 50000; $i++) {
$dataArr[$i] = ['dummy' => uniqid()];
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
// QueueMessage方式
for ($i = 0; $i < 50000; $i++) {
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);
//$message->setData(json_encode(['123']));
//$message->setFailCount(1);
// ....
$dataArr[$i] = $message;
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
var_export($messageIds); // 返回Stream消息ID列表 或 false
/**
* @param mixed|QueueMessageInterface $data
* @param int $delay
* @param string $identifier
* @return bool
* @throws ScheduleDelayedMessageException
*/
public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;
// 消息内容
$message = [
'type' => 'warning',
'to' => '[email protected] ',
'content' => '.....'
];
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);
// QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 设置延时
$message->setDelay(60);
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 使用第二个参数会替换之前对象的延时设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);
// 消息内容
$message = [
'type' => 'warning',
'to' => '[email protected] ',
'content' => '.....'
];
// 通过type,to参数生成一个唯一ID
$identifier = md5(serialize([
'type' => 'warning',
'to' => '[email protected] ',
]));
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);
// QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 设置延时
$message->setDelay(60);
// 设置identifier
$message->setIdentifier($identifier);
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 传递参数会替换对象之前的延时和ID设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);
// 投递10w条延时消息
$dataArr = array_fill(0, 100000, null);
for ($i = 0; $i < 100000; $i++) {
$dataArr[$i] = [
'delay' => 2, // 延时时间
'data' => ['dummy' => uniqid()], // 队列数据
'identifier' => '' // 自定义ID
];
}
// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
// QueueMessage对象
for ($i = 0; $i < 100000; $i++) {
$message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);
// 设置延时
$message->setDelay(60);
// 设置identifier
$message->setIdentifier('');
$dataArr[$i] = $message;
}
// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
/**
* @param string $identifier
* @return bool
*/
public function removeDelayedMessage(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function removeDelayedMessages(array $identifiers): array|bool;
/**
* @param string $identifier
* @return bool
*/
public function hasDelayedMessageExists(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function hasDelayedMessagesExist(array $identifiers): array|bool;
$consumer = new app\queue\test\SendEmail();
$queueProducer = $consumer::createQueueProducer();
// 添加一条延时消息 通过业务数据生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');
// 通过QueueMessage对象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);
$queueMessage->setDelay(60);
// 自定义消息ID 不设置将默认生成 通过getIdentifier()获取
$queueMessage->setIdentifier('test_id');
// 获取消息ID
$id = $queueMessage->getIdentifier();
// 判断消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier')); // true or false
// 移除一条延时消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false
// 判断多条消息是否存在 返回一个数组
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));
//.array (
// 0 => 1706383223.0,
// 1 => 1706383223.0,
// 2 => false
//).
// 移除多条延时消息 返回一个数组
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));
//
//.array (
// 0 => 1,
// 1 => 1,
// 2 => 0
//).
// 检查是否超过最大重试次数
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
/**
* 处理消息挂起超时 当pending列表中有超时未ack的消息会触发此方法
* @param string $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param string $consumerName
* @param int $elapsedTime
* @param int $deliveryCount
* @return void
* @throws RedisException
* @throws ScheduleDelayedMessageException
* @throws Throwable
*/
public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
{
switch ($this->getPendingProcessingStrategy()) {
case self::PENDING_PROCESSING_IGNORE: // 忽略pending超时
// 确认消息
$consumerMessage->ack();
// 触发死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
'PENDING_PROCESSING_IGNORE: Message pending timeout.'
));
break;
case self::PENDING_PROCESSING_RETRY: // pending超时重试
// 触发死信处理
if ($deliveryCount + 1 > $this->getMaxAttempts()) {
// ack消息
$consumerMessage->ack();
$this->handlerDeadLetterQueue(
$messageId,
$consumerMessage,
new Exception(
'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
));
return;
}
// 处理重试
$handlerStatus = $this->handlerFailRetry(
$messageId,
$consumerMessage,
new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
);
if ($handlerStatus) {
$consumerMessage->ack();
}
break;
}
}
// 获取队列的Redis连接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis扩展一致
$redisConnection->xLen();
$redisConnection->sAdd();
// 在消费类中可以直接使用$this->getRedisConnection();
....更多
use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;
队列的Redis连接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis扩展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示从 Stream 的最开始读取
$end = '+'; // 表示读取到 Stream 的最末尾
$count = 100; // 指定要读取的消息数量
// 读取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);
$deleteMessageIds = [];
foreach ($messages as $messageId => $message) {
// 解析原始消息内容
$messageArr = QueueMessage::parseRawMessage($message);
if (!$messageArr) { // 未知消息
$deleteMessageIds[] = $messageId;
continue;
}
// 转换为QueueMessage方便操作
$queueMessage = QueueMessage::createFromArray($messageArr);
// 通过获取消息时间戳,如果消息已经存在超过1个小时 标记删除。
if (time() - $queueMessage->getTimestamp() > 3600) {
$deleteMessageIds[] = $messageId;
}
}
// 批量删除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 禁用错误重试 如果消费失败将不会异常重试
$consumerMessage->disableFailRetry();
// 手动触发错误重试,此方法会调用disableFailRetry方法,所以后续报错不会再触发异常重试。
// 没有禁用错误重试的情况下,消费异常默认会调用此方法。
$consumerMessage->triggerError(new \Exception('triggerError'));
// 监听消费异常事件
$consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
// 这里可以处理消费异常逻辑
// 禁用错误重试
$consumerMessage->disableFailRetry();
// 添加日志等等
// 如果在消费方法中自行捕获 Throwable 此事件不会触发
});
// 业务逻辑执行完毕,ack确认消息 默认自动ack,但通常建议在业务逻辑中显式调用,比如ack失败进行事务回滚等等。
$isAcked = $consumerMessage->ack();
if (!$isAcked) {
}
// 或通过getAckStatus方法获取结果
if (!$consumerMessage->getAckStatus()) {
}
// 获取原始队列消息 QueueMessage对象
$queueMessage = $consumerMessage->getQueueMessage();
// 获取消息错误次数...
$failCount = $queueMessage->getFailCount();
// 更多...
}
}
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
// 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 监听异常
$consumerMessage->onError(function (\Throwable $e){
// 记录邮件发送失败日志
});
// 禁用重试
$consumerMessage->disableFailRetry();
// 发送一封邮件 ....
// 确认消息
$consumerMessage->ack();
}
}
/**
* 处理错误重试
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return bool
* @throws ScheduleDelayedMessageException
* @throws RedisException
* @throws Throwable
*/
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
$queueMessage = $consumerMessage->getQueueMessage();
// 检查是否超过最大重试次数
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
$queueMessage->incrementFailCount(); // Fail count + 1
// 计算下次重试时间
$retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;
// 更新下次重试时间
$queueMessage->updateNextRetry($retrySeconds);
// 设置消息延时
$queueMessage->setDelay($retrySeconds);
// 设置消息ID 避免重复任务
$queueMessage->setIdentifier($messageId);
// 重新发布至延时队列
return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
}
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
// 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 监听异常
$consumerMessage->onError(function (\Throwable $e){
// 记录邮件发送失败日志
});
// 禁用重试
$consumerMessage->disableFailRetry();
// 发送一封邮件 ....
// 确认消息
$consumerMessage->ack();
}
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
// 不改动原本的错误处理 也可以完全自定义实现。
parent::handlerFailRetry($messageId, $consumerMessage, $e);
// 如果队列在业务数据库中还有一个tasks表进行调度,在这里可以更新task数据 比如 错误次数+1
}
}
/**
* 处理死信 超过最大重试次数或pending超时PENDING_PROCESSING_IGNORE策略 会调用此方法
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return void
*/
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
$queueMessage = $consumerMessage->getQueueMessage();
// 添加日志
LogUtility::warning('dead_letter_queue: ', [
'messageId' => $messageId,
'message' => $queueMessage->toArray(),
'failCount' => $queueMessage->getFailCount(),
'errorMsg' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
// 更多...
}