PHP code example of solarseahorse / webman-redis-queue

1. Go to this page and download the library: Download solarseahorse/webman-redis-queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

solarseahorse / webman-redis-queue example snippets



return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => null,       // 密码,字符串类型,可选参数
            'db' => 0,            // 数据库
            'prefix' => 'webman_redis_queue_',      // key 前缀
            'timeout' => 2, // Timeout
            'ping' => 55,             // Ping
            'reconnect' => true,  // 断线重连
            'max_retries' => 5, // 最大重连次数
            'retry_interval' => 5 , // 重连间隔 s
        ]
    ],
];




return [
    'enable' => true, // 启用日志
    'handlers' => support\Log::channel('default') // 默认通道 default
];


LogUtility::warning('Error:', [
      'data' => $consumerMessage->getData(),
      'errorMessage' => $e->getMessage()
]);




return [
    'send-email' => [
        'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
        'count' => 20, // 在目录模式中,目录下所有队列是共用进程
        'constructor' => [
            // 支持目录和类 推荐使用类名
            'consumer_source' => \App\queue\test\Email::class
        ]
    ]
];




namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.
    }
}



namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendCode extends Consumer
{
    // 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    // 消费
    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 获取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 获取队列数据
        $data = $consumerMessage->getData();

        var_dump($messageId);
    }
}




return array (
  'sendCode' => 
  array (
    'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
    'count' => 1,
    'constructor' => 
    array (
      'consumer_source' => 'app\\queue\\test\\SendCode',
    ),
  ),
);


/**
 * @param mixed|QueueMessageInterface $data
 * @return string|bool
 * @throws QueueMessagePushException
 */
 
 public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;



// 消息内容,无需序列化
$message = [
    'dummy' => 'ok'
];

// 生产者工厂方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
    ->pushMessage($message);

// 通过消费类工厂方法 创建一个生产者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

// 投递QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);

// 或者通过QueueMessageFactory创建一条消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);

// 修改队列数据
$message->setData(['dummy' => 'no']);

// 设置错误次数
$message->setFailCount(3);

// 通过上方两种方法投递均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

var_export($messageId); // 返回stream的字符串ID 或 false


/**
 * @param array|QueueMessageInterface[] $dataArray
 * @return array|false
 * @throws QueueMessagePushException
 */
 
 public function pushMessages(array $dataArray): array|bool;



// 投递5w条消息
$dataArr = array_fill(0, 50000, null);

for ($i = 0; $i < 50000; $i++) {
    $dataArr[$i] = ['dummy' => uniqid()];
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);


// QueueMessage方式

for ($i = 0; $i < 50000; $i++) {

    $message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);

    //$message->setData(json_encode(['123']));
    //$message->setFailCount(1);
    // ....

    $dataArr[$i] = $message;
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);

var_export($messageIds); // 返回Stream消息ID列表 或 false
        

    /**
     * @param mixed|QueueMessageInterface $data
     * @param int $delay
     * @param string $identifier
     * @return bool
     * @throws ScheduleDelayedMessageException
     */
    public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;


// 消息内容
$message = [
    'type' => 'warning',
    'to' => '[email protected]',
    'content' => '.....'
];

// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);

// QueueMessage对象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 设置延时
$message->setDelay(60);

// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 使用第二个参数会替换之前对象的延时设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);



// 消息内容
$message = [
    'type' => 'warning',
    'to' => '[email protected]',
    'content' => '.....'
];

// 通过type,to参数生成一个唯一ID
$identifier = md5(serialize([
    'type' => 'warning',
    'to' => '[email protected]',
]));

// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);

// QueueMessage对象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 设置延时
$message->setDelay(60);

// 设置identifier
$message->setIdentifier($identifier);

// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 传递参数会替换对象之前的延时和ID设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);



// 投递10w条延时消息
$dataArr = array_fill(0, 100000, null);

for ($i = 0; $i < 100000; $i++) {
    $dataArr[$i] = [
        'delay' => 2, // 延时时间
        'data' => ['dummy' => uniqid()], // 队列数据
        'identifier' => '' // 自定义ID
    ];
}

// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);

// QueueMessage对象

for ($i = 0; $i < 100000; $i++) {

    $message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);

    // 设置延时
    $message->setDelay(60);

    // 设置identifier
    $message->setIdentifier('');

    $dataArr[$i] = $message;
}

// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);


    /**
     * @param string $identifier
     * @return bool
     */
    public function removeDelayedMessage(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function removeDelayedMessages(array $identifiers): array|bool;

    /**
     * @param string $identifier
     * @return bool
     */
    public function hasDelayedMessageExists(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function hasDelayedMessagesExist(array $identifiers): array|bool;

$consumer = new app\queue\test\SendEmail();

$queueProducer = $consumer::createQueueProducer();

// 添加一条延时消息 通过业务数据生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');

// 通过QueueMessage对象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);

$queueMessage->setDelay(60);

// 自定义消息ID 不设置将默认生成 通过getIdentifier()获取
$queueMessage->setIdentifier('test_id');

// 获取消息ID
$id = $queueMessage->getIdentifier();

// 判断消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier'));  // true or false

// 移除一条延时消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false

// 判断多条消息是否存在 返回一个数组
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));

//.array (
//    0 => 1706383223.0,
//    1 => 1706383223.0,
//    2 => false
//).

// 移除多条延时消息 返回一个数组
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));

//
//.array (
//    0 => 1,
//    1 => 1,
//    2 => 0
//).  


// 检查是否超过最大重试次数
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
    // 死信处理
    $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
    return true;
}



     /**
     * 处理消息挂起超时 当pending列表中有超时未ack的消息会触发此方法
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param string $consumerName
     * @param int $elapsedTime
     * @param int $deliveryCount
     * @return void
     * @throws RedisException
     * @throws ScheduleDelayedMessageException
     * @throws Throwable
     */
    public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
    {
        switch ($this->getPendingProcessingStrategy()) {
            case self::PENDING_PROCESSING_IGNORE: // 忽略pending超时

                // 确认消息
                $consumerMessage->ack();

                // 触发死信处理
                $this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
                    'PENDING_PROCESSING_IGNORE: Message pending timeout.'
                ));
                break;
            case self::PENDING_PROCESSING_RETRY: // pending超时重试

                // 触发死信处理
                if ($deliveryCount + 1 > $this->getMaxAttempts()) {

                    // ack消息
                    $consumerMessage->ack();

                    $this->handlerDeadLetterQueue(
                        $messageId,
                        $consumerMessage,
                        new Exception(
                            'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
                        ));

                    return;
                }

                // 处理重试
                $handlerStatus = $this->handlerFailRetry(
                    $messageId,
                    $consumerMessage,
                    new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
                );

                if ($handlerStatus) {
                    $consumerMessage->ack();
                }
                break;
        }
    }



// 获取队列的Redis连接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis扩展一致

$redisConnection->xLen();

$redisConnection->sAdd();

// 在消费类中可以直接使用$this->getRedisConnection();

....更多





use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;

队列的Redis连接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis扩展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示从 Stream 的最开始读取
$end = '+';   // 表示读取到 Stream 的最末尾
$count = 100;  // 指定要读取的消息数量

// 读取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);

$deleteMessageIds = [];

foreach ($messages as $messageId => $message) {

    // 解析原始消息内容
    $messageArr = QueueMessage::parseRawMessage($message);

    if (!$messageArr) { // 未知消息
        $deleteMessageIds[] = $messageId;
        continue;
    }

    // 转换为QueueMessage方便操作
    $queueMessage = QueueMessage::createFromArray($messageArr);

    // 通过获取消息时间戳,如果消息已经存在超过1个小时 标记删除。
    if (time() - $queueMessage->getTimestamp() > 3600) {
        $deleteMessageIds[] = $messageId;
    }

}

// 批量删除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);




namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 连接标识,对应redis.php的配置 默认default
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 获取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 获取队列数据
        $data = $consumerMessage->getData();

        // 禁用错误重试 如果消费失败将不会异常重试
        $consumerMessage->disableFailRetry();

        // 手动触发错误重试,此方法会调用disableFailRetry方法,所以后续报错不会再触发异常重试。
        // 没有禁用错误重试的情况下,消费异常默认会调用此方法。
        $consumerMessage->triggerError(new \Exception('triggerError'));

        // 监听消费异常事件
        $consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
            // 这里可以处理消费异常逻辑

            // 禁用错误重试
            $consumerMessage->disableFailRetry();

            // 添加日志等等
            // 如果在消费方法中自行捕获 Throwable 此事件不会触发
        });

        // 业务逻辑执行完毕,ack确认消息 默认自动ack,但通常建议在业务逻辑中显式调用,比如ack失败进行事务回滚等等。
        $isAcked = $consumerMessage->ack();

        if (!$isAcked) {
            
        }

        // 或通过getAckStatus方法获取结果
        if (!$consumerMessage->getAckStatus()) {
            
        }

        // 获取原始队列消息 QueueMessage对象
        $queueMessage = $consumerMessage->getQueueMessage();

        // 获取消息错误次数...
        $failCount = $queueMessage->getFailCount();
        // 更多...
    }
}



namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 连接标识,对应redis.php的配置 默认default
    protected string $connection = 'default';

    // 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 获取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 获取队列数据
        $data = $consumerMessage->getData();

        // 监听异常
        $consumerMessage->onError(function (\Throwable $e){

            // 记录邮件发送失败日志
        });
        
        // 禁用重试
        $consumerMessage->disableFailRetry();

        // 发送一封邮件 ....

        // 确认消息
        $consumerMessage->ack();
    }
}


/**
     * 处理错误重试
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return bool
     * @throws ScheduleDelayedMessageException
     * @throws RedisException
     * @throws Throwable
     */
    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 检查是否超过最大重试次数
        if ($queueMessage->getFailCount() >= $this->maxAttempts) {
            // 死信处理
            $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
            return true;
        }

        $queueMessage->incrementFailCount(); // Fail count + 1

        // 计算下次重试时间
        $retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;

        // 更新下次重试时间
        $queueMessage->updateNextRetry($retrySeconds);

        // 设置消息延时
        $queueMessage->setDelay($retrySeconds);

        // 设置消息ID 避免重复任务
        $queueMessage->setIdentifier($messageId);

        // 重新发布至延时队列
        return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
    }
	



namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;

class SendEmail extends Consumer
{
    // 连接标识,对应redis.php的配置 默认default
    protected string $connection = 'default';

    // 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 获取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 获取队列数据
        $data = $consumerMessage->getData();

        // 监听异常
        $consumerMessage->onError(function (\Throwable $e){

            // 记录邮件发送失败日志
        });

        // 禁用重试
        $consumerMessage->disableFailRetry();

        // 发送一封邮件 ....

        // 确认消息
        $consumerMessage->ack();
    }

    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        // 不改动原本的错误处理 也可以完全自定义实现。
        parent::handlerFailRetry($messageId, $consumerMessage, $e);

	   // 如果队列在业务数据库中还有一个tasks表进行调度,在这里可以更新task数据 比如 错误次数+1
    }
}


    /**
     * 处理死信 超过最大重试次数或pending超时PENDING_PROCESSING_IGNORE策略 会调用此方法
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return void
     */
    public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 添加日志
        LogUtility::warning('dead_letter_queue: ', [
            'messageId' => $messageId,
            'message' => $queueMessage->toArray(),
            'failCount' => $queueMessage->getFailCount(),
            'errorMsg' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);
        
        // 更多...
    }