Download the PHP package ssh/amqp without Composer
On this page you can find all versions of the php package ssh/amqp. It is possible to download/install these versions without Composer. Possible dependencies are resolved automatically.
Please rate this library. Is it a good library?
Informations about the package amqp
AMQP
AMQP library wrapper for php-amqplib,基于 php-amqplib 封装的 RabbitMQ 客户端组件。
安装
或者手动安装:
配置
在 webman 框架中创建配置文件 config/plugin/webman/amqp/amqp.php:
发送消息
方式一:静态调用
方式二:实例调用
方式三:发送带属性的消息
消费消息
创建消费者类
创建一个实现 Consumer 接口的类:
带交换机和路由键的消费者
在 webman 中配置消费者进程
在 config/plugin/webman/amqp/process.php 中添加:
消费者工作流程
当消费者进程启动后,会执行以下步骤:
- 扫描消费者类:扫描指定目录下的所有 PHP 文件,查找实现了
ssh\Amqp\Consumer接口的类 - 设置消费者:为每个消费者类创建 AMQP 连接、声明交换机和队列、绑定关系
- 启动消息循环:进入无限循环,持续监听消息并调用相应的消费者处理
- 自动重连:如果连接断开,会自动尝试重连
- 异常处理:消息处理过程中的异常会被捕获并记录,同时根据配置决定是否重新入队
消费者接口说明
必须实现的方法
可选属性
connection: 连接名称,默认 'default'exchange: 交换机名称(可选)exchange_type: 交换机类型,默认 'direct'exchange_durable: 交换机是否持久化,默认 truerouting_key: 路由键(可选)queue_durable: 队列是否持久化,默认 truequeue_exclusive: 队列是否独占,默认 falsequeue_auto_delete: 队列是否自动删除,默认 falsequeue_arguments: 队列额外参数no_ack: 是否自动确认,默认 falseprefetch_count: 预取消息数量,默认 1config: 配置名称(可选)
可选方法
shouldRequeue(\Exception $e): 判断处理失败的消息是否应该重新入队,返回 true 表示重新入队,false 表示丢弃。默认返回 true。
高级用法
手动声明交换机和队列
批量消费消息
消息属性
消费者接收的 $properties 数组包含以下可能的字段:
content_type: 内容类型content_encoding: 内容编码delivery_mode: 投递模式priority: 优先级correlation_id: 关联 IDreply_to: 回复地址expiration: 过期时间message_id: 消息 IDtimestamp: 时间戳type: 消息类型user_id: 用户 IDapp_id: 应用 ID
异常处理
异常类
组件提供了以下异常类:
ssh\Amqp\Exception\AmqpException: 所有 AMQP 异常的基类ssh\Amqp\Exception\ConnectionException: 连接相关的异常ssh\Amqp\Exception\ChannelException: 通道相关的异常ssh\Amqp\Exception\PublishException: 发布消息相关的异常ssh\Amqp\Exception\ConsumeException: 消费消息相关的异常ssh\Amqp\Exception\QueueException: 队列相关的异常ssh\Amqp\Exception\ExchangeException: 交换机相关的异常
发送消息时的异常处理
消费者中的异常处理
消费者进程会自动处理消息处理过程中的异常,并记录日志。如果消费者实现了 shouldRequeue 方法,可以控制失败的消息是否重新入队:
连接管理
Client 类提供了以下方法来管理连接:
配置日志
在消费者进程中可以配置日志记录器:
API 参考
Client 类方法
静态方法
Client::connection($name = 'default', $config = null): 获取连接实例Client::send($queue, $body, $connection = 'default', $config = null, $properties = [], $exchange = '', $routing_key = null, $onSuccess = null, $onError = null): 发送消息,返回 bool
实例方法
publish($queue, $exchange, $routing_key, $msg, $mandatory, $immediate, $ticket, $onSuccess, $onError): 发布消息,返回 booldeclareQueue(...): 声明队列declareExchange(...): 声明交换机bindQueue(...): 绑定队列consume(...): 消费消息ack($delivery_tag, $multiple, $requeue): 确认消息nack($delivery_tag, $multiple, $requeue): 拒绝消息reject($delivery_tag, $requeue): 拒绝单条消息qos(...): 设置 QoSwait($timeout): 等待消息close(): 关闭连接isConnected(): 检查连接是否正常reconnect(): 重新连接getChannel(): 获取通道对象getConnection(): 获取连接对象
回调参数说明
$onSuccess: 成功回调函数,参数为($msg, $queue, $exchange, $routing_key)$onError: 失败回调函数,参数为($e, $msg, $queue, $exchange, $routing_key)
依赖
- PHP >= 7.4
- php-amqplib/php-amqplib >= 3.5
License
MIT
All versions of amqp with dependencies
PHP Build Version
Package Version
The package ssh/amqp contains the following files
Loading the files please wait ...