PHP code example of imiphp / imi-queue
1. Go to this page and download the library: Download imiphp/imi-queue library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
imiphp / imi-queue example snippets
[
'components' => [
'Queue' => 'Imi\Queue',
],
'beans' => [
'AutoRunProcessManager' => [
'processes' => [
// 加入队列消费进程,非必须,你也可以自己写进程消费
'QueueConsumer',
],
],
'imiQueue' => [
// 默认队列
'default' => 'test1',
// 队列列表
'list' => [
// 队列名称
'test1' => [
// 使用的队列驱动
'driver' => \Imi\Queue\Driver\RedisQueueDriver::class,
// 消费协程数量
'co' => 1,
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
'process' => 1,
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
'timespan' => 0.1,
// 进程分组名称
'processGroup' => 'a',
// 自动消费
'autoConsumer' => true,
// 消费者类
'consumer' => 'AConsumer',
// 驱动类所需要的参数数组
'config' => [
// redis 连接池名
'poolName' => 'redis',
// redis 键前缀
'prefix' => 'imi:queue:test:',
// 消费循环尝试 pop 的时间间隔,单位:秒(手动调用pop()方法有效)
'timespan' => 0.1,
]
],
],
],
]
]
namespace ImiApp\Consumer;
use Imi\Log\Log;
use Imi\Bean\Annotation\Bean;
use Imi\Queue\Contract\IMessage;
use Imi\Queue\Driver\IQueueDriver;
use Imi\Queue\Service\BaseQueueConsumer;
/**
* @Bean("AConsumer")
*/
class AConsumer extends BaseQueueConsumer
{
/**
* 处理消费
*
* @param \Imi\Queue\Contract\IMessage $message
* @param \Imi\Queue\Driver\IQueueDriver $queue
* @return void
*/
protected function consume(IMessage $message, IQueueDriver $queue)
{
Log::info(sprintf('[%s]%s:%s', $queue->getName(), $message->getMessageId(), $message->getMessage()));
$queue->success($message);
}
}
use \Imi\Queue\Facade\Queue;
$queue = Queue::getQueue('队列名称');
$message = new \Imi\Queue\Model\Message;
$message->setMessage('字符串的消息内容');
$message->setWorkingTimeout(0); // 设置工作超时时间,单位:秒,为0不限制
$queue->push($message);
// 延时消息,单位:秒
$queue->push($message, 1.5);
$message = $queue->pop();
if(null !== $message)
{
// 将消息标记为成功
$queue->success($message);
// 将消息标记为失败
$queue->fail($message);
// 将消息标记为失败,并重回队列
$queue->fail($message, true);
}
$message = $queue->pop();
if(null !== $message)
{
$queue->delete($message);
}
use \Imi\Queue\Enum\QueueType;
$queue->clear(); // 清空全部
// 清空指定类型
$queue->clear([
QueueType::READY, // 准备就绪
QueueType::WORKING, // 工作中
QueueType::FAIL, // 失败
QueueType::TIMEOUT, // 超时
QueueType::DELAY, // 准备就绪延时
]);
// 返回 \Imi\Queue\Model\QueueStatus 类型
$status = $queue->status();
$status->getReady(); // 准备就绪数量
$status->getWorking(); // 工作中数量
$status->getFail(); // 失败数量
$status->getTimeout(); // 超时数量
$status->getDelay(); // 延时数量
$queue->restoreFailMessages();
$queue->restoreTimeoutMessages();