PHP code example of imiphp / imi-queue

1. Go to this page and download the library: Download imiphp/imi-queue library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

imiphp / imi-queue example snippets


[
    'components'    =>  [
        'Queue'  =>  'Imi\Queue',
    ],
    'beans' =>  [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                // 加入队列消费进程,非必须,你也可以自己写进程消费
                'QueueConsumer',
            ],
        ],
        'imiQueue'  =>  [
            // 默认队列
            'default'   =>  'test1',
            // 队列列表
            'list'  =>  [
                // 队列名称
                'test1' =>  [
                    // 使用的队列驱动
                    'driver'        =>  \Imi\Queue\Driver\RedisQueueDriver::class,
                    // 消费协程数量
                    'co'            =>  1,
                    // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                    'process'       =>  1,
                    // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                    'timespan'      =>  0.1,
                    // 进程分组名称
                    'processGroup'  =>  'a',
                    // 自动消费
                    'autoConsumer'  =>  true,
                    // 消费者类
                    'consumer'      =>  'AConsumer',
                    // 驱动类所需要的参数数组
                    'config'        =>  [
                        // redis 连接池名
                        'poolName'  =>  'redis',
                        // redis 键前缀
                        'prefix'    =>  'imi:queue:test:',
                        // 消费循环尝试 pop 的时间间隔,单位:秒(手动调用pop()方法有效)
                        'timespan'  =>  0.1,
                    ]
                ],
            ],
        ],
    ]
]


namespace ImiApp\Consumer;

use Imi\Log\Log;
use Imi\Bean\Annotation\Bean;
use Imi\Queue\Contract\IMessage;
use Imi\Queue\Driver\IQueueDriver;
use Imi\Queue\Service\BaseQueueConsumer;

/**
 * @Bean("AConsumer")
 */
class AConsumer extends BaseQueueConsumer
{
    /**
     * 处理消费
     * 
     * @param \Imi\Queue\Contract\IMessage $message
     * @param \Imi\Queue\Driver\IQueueDriver $queue
     * @return void
     */
    protected function consume(IMessage $message, IQueueDriver $queue)
    {
        Log::info(sprintf('[%s]%s:%s', $queue->getName(), $message->getMessageId(), $message->getMessage()));
        $queue->success($message);
    }

}

use \Imi\Queue\Facade\Queue;
$queue = Queue::getQueue('队列名称');

$message = new \Imi\Queue\Model\Message;
$message->setMessage('字符串的消息内容');
$message->setWorkingTimeout(0); // 设置工作超时时间,单位:秒,为0不限制
$queue->push($message);
// 延时消息,单位:秒
$queue->push($message, 1.5);

$message = $queue->pop();
if(null !== $message)
{
    // 将消息标记为成功
    $queue->success($message);

    // 将消息标记为失败
    $queue->fail($message);

    // 将消息标记为失败,并重回队列
    $queue->fail($message, true);
}

$message = $queue->pop();
if(null !== $message)
{
    $queue->delete($message);
}

use \Imi\Queue\Enum\QueueType;

$queue->clear(); // 清空全部

// 清空指定类型
$queue->clear([
    QueueType::READY,   // 准备就绪
    QueueType::WORKING, // 工作中
    QueueType::FAIL,    // 失败
    QueueType::TIMEOUT, // 超时
    QueueType::DELAY,   // 准备就绪延时
]);

// 返回 \Imi\Queue\Model\QueueStatus 类型
$status = $queue->status();
$status->getReady();    // 准备就绪数量
$status->getWorking();  // 工作中数量
$status->getFail();     // 失败数量
$status->getTimeout();  // 超时数量
$status->getDelay();    // 延时数量

$queue->restoreFailMessages();

$queue->restoreTimeoutMessages();