PHP code example of imiphp / imi-amqp
1. Go to this page and download the library: Download imiphp/imi-amqp library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
imiphp / imi-amqp example snippets
[
'components' => [
// 引入组件
'AMQP' => 'Imi\AMQP',
],
]
[
'pools' => [
'rabbit' => [
'sync' => [
'pool' => [
'class' => \Imi\AMQP\Pool\AMQPSyncPool::class,
'config' => [
'maxResources' => 10,
'minResources' => 0,
],
],
'resource' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]
],
'async' => [
'pool' => [
'class' => \Imi\AMQP\Pool\AMQPCoroutinePool::class,
'config' => [
'maxResources' => 10,
'minResources' => 1,
],
],
'resource' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]
],
],
]
]
[
'beans' => [
'AMQP' => [
'defaultPoolName' => 'rabbit',
],
],
]
namespace AMQPApp\AMQP\Test2;
use Imi\AMQP\Message;
class TestMessage2 extends Message
{
/**
* 用户ID
*
* @var int
*/
private $memberId;
/**
* 内容
*
* @var string
*/
private $content;
public function __construct()
{
parent::__construct();
$this->routingKey = 'imi-2';
$this->format = \Imi\Util\Format\Json::class;
}
/**
* 设置主体数据
*
* @param mixed $data
* @return self
*/
public function setBodyData($data)
{
foreach($data as $k => $v)
{
$this->$k = $v;
}
}
/**
* 获取主体数据
*
* @return mixed
*/
public function getBodyData()
{
return [
'memberId' => $this->memberId,
'content' => $this->content,
];
}
/**
* Get 用户ID
*
* @return int
*/
public function getMemberId()
{
return $this->memberId;
}
/**
* Set 用户ID
*
* @param int $memberId 用户ID
*
* @return self
*/
public function setMemberId(int $memberId)
{
$this->memberId = $memberId;
return $this;
}
/**
* Get 内容
*
* @return string
*/
public function getContent()
{
return $this->content;
}
/**
* Set 内容
*
* @param string $content 内容
*
* @return self
*/
public function setContent(string $content)
{
$this->content = $content;
return $this;
}
}
namespace AMQPApp\AMQP\Test;
use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BasePublisher;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Annotation\Publisher;
use Imi\AMQP\Annotation\Connection;
/**
* @Bean("TestPublisher")
* @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
* @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1")
* @Queue(name="queue-imi-1", routingKey="imi-1")
* @Exchange(name="exchange-imi")
*/
class TestPublisher extends BasePublisher
{
}
namespace AMQPApp\AMQP\Test;
use Imi\Redis\Redis;
use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BaseConsumer;
use Imi\AMQP\Contract\IMessage;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Enum\ConsumerResult;
use Imi\AMQP\Annotation\Connection;
/**
* 启动一个新连接消费
*
* @Bean("TestConsumer")
* @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
* @Consumer(tag="tag-imi", queue="queue-imi-1", message=\AMQPApp\AMQP\Test\TestMessage::class)
*/
class TestConsumer extends BaseConsumer
{
/**
* 消费任务
*
* @param \AMQPApp\AMQP\Test\TestMessage $message
*/
protected function consume(IMessage $message): int
{
var_dump(__CLASS__, $message->getBody(), get_class($message));
Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
return ConsumerResult::ACK;
}
}
[
'components' => [
'AMQP' => 'Imi\AMQP',
],
'beans' => [
'AutoRunProcessManager' => [
'processes' => [
// 加入队列消费进程,非必须,你也可以自己写进程消费
'QueueConsumer',
],
],
'imiQueue' => [
// 默认队列
'default' => 'test1',
// 队列列表
'list' => [
// 队列名称
'test1' => [
// 使用的队列驱动
'driver' => 'AMQPQueueDriver',
// 消费协程数量
'co' => 1,
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
'process' => 1,
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
'timespan' => 0.1,
// 进程分组名称
'processGroup' => 'a',
// 自动消费
'autoConsumer' => true,
// 消费者类
'consumer' => 'AConsumer',
// 驱动类所需要的参数数组
'config' => [
// AMQP 连接池名称
'poolName' => 'amqp',
// Redis 连接池名称
'redisPoolName'=> 'redis',
// Redis 键名前缀
'redisPrefix' => 'test1:',
// 可选配置:
// 支持消息删除功能,依赖 Redis
'supportDelete' => true,
// 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
'supportTimeout' => true,
// 支持消费失败队列功能,自动增加一个队列
'supportFail' => true,
// 循环尝试 pop 的时间间隔,单位:秒
'timespan' => 0.03,
// 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
'queueLength' => 16,
// 消息类名
'message' => \Imi\AMQP\Queue\JsonAMQPMessage::class,
]
],
],
],
]
]