PHP code example of imiphp / imi-amqp

1. Go to this page and download the library: Download imiphp/imi-amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

imiphp / imi-amqp example snippets


[
    'components'    =>  [
        // 引入组件
        'AMQP'   =>  'Imi\AMQP',
    ],
]

[
    'pools'    =>    [
        'rabbit'    =>  [
            'sync'    =>    [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPSyncPool::class,
                    'config'    =>    [
                        'maxResources'    =>    10,
                        'minResources'    =>    0,
                    ],
                ],
                'resource'    =>    [
                    'host'      => '127.0.0.1',
                    'port'      => 5672,
                    'user'      => 'guest',
                    'password'  => 'guest',
                ]
            ],
            'async'    =>    [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,
                    'config'    =>    [
                        'maxResources'    =>    10,
                        'minResources'    =>    1,
                    ],
                ],
                'resource'    =>    [
                    'host'      => '127.0.0.1',
                    'port'      => 5672,
                    'user'      => 'guest',
                    'password'  => 'guest',
                ]
            ],
        ],
    ]
]

[
    'beans' =>  [
        'AMQP'  =>  [
            'defaultPoolName'   =>  'rabbit',
        ],
    ],
]


namespace AMQPApp\AMQP\Test2;

use Imi\AMQP\Message;

class TestMessage2 extends Message
{
    /**
     * 用户ID
     *
     * @var int
     */
    private $memberId;

    /**
     * 内容
     *
     * @var string
     */
    private $content;

    public function __construct()
    {
        parent::__construct();
        $this->routingKey = 'imi-2';
        $this->format = \Imi\Util\Format\Json::class;
    }

    /**
     * 设置主体数据
     *
     * @param mixed $data
     * @return self
     */
    public function setBodyData($data)
    {
        foreach($data as $k => $v)
        {
            $this->$k = $v;
        }
    }

    /**
     * 获取主体数据
     *
     * @return mixed
     */
    public function getBodyData()
    {
        return [
            'memberId'  =>  $this->memberId,
            'content'   =>  $this->content,
        ];
    }

    /**
     * Get 用户ID
     *
     * @return int
     */ 
    public function getMemberId()
    {
        return $this->memberId;
    }

    /**
     * Set 用户ID
     *
     * @param int $memberId  用户ID
     *
     * @return self
     */ 
    public function setMemberId(int $memberId)
    {
        $this->memberId = $memberId;

        return $this;
    }

    /**
     * Get 内容
     *
     * @return string
     */ 
    public function getContent()
    {
        return $this->content;
    }

    /**
     * Set 内容
     *
     * @param string $content  内容
     *
     * @return self
     */ 
    public function setContent(string $content)
    {
        $this->content = $content;

        return $this;
    }

}


namespace AMQPApp\AMQP\Test;

use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BasePublisher;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Annotation\Publisher;
use Imi\AMQP\Annotation\Connection;

/**
 * @Bean("TestPublisher")
 * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
 * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1")
 * @Queue(name="queue-imi-1", routingKey="imi-1")
 * @Exchange(name="exchange-imi")
 */
class TestPublisher extends BasePublisher
{

}


namespace AMQPApp\AMQP\Test;

use Imi\Redis\Redis;
use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BaseConsumer;
use Imi\AMQP\Contract\IMessage;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Enum\ConsumerResult;
use Imi\AMQP\Annotation\Connection;

/**
 * 启动一个新连接消费
 * 
 * @Bean("TestConsumer")
 * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
 * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\AMQPApp\AMQP\Test\TestMessage::class)
 */
class TestConsumer extends BaseConsumer
{
    /**
     * 消费任务
     *
     * @param \AMQPApp\AMQP\Test\TestMessage $message
     */
    protected function consume(IMessage $message): int
    {
        var_dump(__CLASS__, $message->getBody(), get_class($message));
        Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
        return ConsumerResult::ACK;
    }

}


[
    'components'    =>  [
        'AMQP'  =>  'Imi\AMQP',
    ],
    'beans' =>  [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                // 加入队列消费进程,非必须,你也可以自己写进程消费
                'QueueConsumer',
            ],
        ],
        'imiQueue'  =>  [
            // 默认队列
            'default'   =>  'test1',
            // 队列列表
            'list'  =>  [
                // 队列名称
                'test1' =>  [
                    // 使用的队列驱动
                    'driver'        =>  'AMQPQueueDriver',
                    // 消费协程数量
                    'co'            =>  1,
                    // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                    'process'       =>  1,
                    // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                    'timespan'      =>  0.1,
                    // 进程分组名称
                    'processGroup'  =>  'a',
                    // 自动消费
                    'autoConsumer'  =>  true,
                    // 消费者类
                    'consumer'      =>  'AConsumer',
                    // 驱动类所需要的参数数组
                    'config'        =>  [
                        // AMQP 连接池名称
                        'poolName'      =>  'amqp',
                        // Redis 连接池名称
                        'redisPoolName'=>  'redis',
                        // Redis 键名前缀
                        'redisPrefix'   =>  'test1:',
                        // 可选配置:
                        // 支持消息删除功能,依赖 Redis
                        'supportDelete' =>  true,
                        // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
                        'supportTimeout' =>  true,
                        // 支持消费失败队列功能,自动增加一个队列
                        'supportFail' =>  true,
                        // 循环尝试 pop 的时间间隔,单位:秒
                        'timespan'  =>  0.03,
                        // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
                        'queueLength'   =>  16,
                        // 消息类名
                        'message'   =>  \Imi\AMQP\Queue\JsonAMQPMessage::class,
                    ]
                ],
            ],
        ],
    ]
]