PHP code example of ssh / amqp
1. Go to this page and download the library: Download ssh/amqp library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
ssh / amqp example snippets
return [
'default' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'options' => [
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'heartbeat' => 0,
'max_reconnect_attempts' => 3, // 最大重连次数
'reconnect_delay' => 1, // 重连间隔(秒)
],
],
'cluster' => [
'host' => '192.168.1.100',
'port' => 5672,
'user' => 'admin',
'password' => 'admin123',
'vhost' => '/',
'options' => [
'max_reconnect_attempts' => 5,
'reconnect_delay' => 2,
],
],
];
use ssh\Amqp\Client;
// 发送字符串消息(默认交换机,routing_key = 队列名)
Client::send('my_queue', 'Hello World!');
// 使用指定连接
Client::send('my_queue', 'Hello World!', 'consumer');
// 使用指定连接和配置
Client::send('my_queue', 'Hello World!', 'consumer', 'plugin.rabbitmq.rabbitmq');
// 发送到交换机
Client::send(
'my_queue', // 队列名
'Hello World!', // 消息内容
'consumer', // 连接名
'plugin.rabbitmq.rabbitmq', // 配置名(可选)
[], // 消息属性
'my_exchange', // 交换机名
'my_routing_key' // 路由键(可选,默认为队列名)
);
// 发送 JSON 数据
Client::send('my_queue', json_encode(['id' => 1, 'name' => 'test']));
// 带成功/失败回调
Client::send(
'my_queue',
'Hello World!',
'consumer',
'plugin.rabbitmq.rabbitmq',
[],
'',
'my_queue',
function($msg, $queue, $exchange, $routing_key) {
echo "发送成功!\n";
echo "消息: " . $msg->body . "\n";
},
function($e, $msg, $queue, $exchange, $routing_key) {
echo "发送失败:" . $e->getMessage() . "\n";
}
);
// 使用返回值判断
try {
$result = Client::send('my_queue', 'Hello World!');
if ($result) {
echo "发送成功!\n";
}
} catch (\Exception $e) {
echo "发送失败:" . $e->getMessage() . "\n";
}
use ssh\Amqp\Client;
$client = Client::connection('consumer');
// 简单发送
$result = $client->publish('my_queue', '', 'my_routing_key', 'Hello AMQP!');
if ($result) {
echo "发送成功!\n";
}
// 发送消息到指定交换机
$client->publish('my_queue', 'my_exchange', 'my_routing_key', 'Hello AMQP!');
// 带回调的发送
$client->publish(
'my_queue',
'',
'my_routing_key',
'Hello AMQP!',
false,
false,
0,
function($msg, $queue, $exchange, $routing_key) {
echo "发送成功!\n";
},
function($e, $msg, $queue, $exchange, $routing_key) {
echo "发送失败:" . $e->getMessage() . "\n";
}
);
use ssh\Amqp\Client;
use PhpAmqpLib\Message\AMQPMessage;
$msg = new AMQPMessage('Hello World!', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'priority' => 5,
]);
$client = Client::connection('default');
$client->publish('my_queue', 'my_exchange', 'my_routing_key', $msg);
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class MyConsumer implements Consumer
{
// 连接名称,默认 'default'
public $connection = 'default';
// 队列名称
public $queue = 'my_queue';
// 是否自动确认,false 表示需要手动 ack
public $no_ack = false;
// 预取消息数量
public $prefetch_count = 1;
/**
* 消费消息
*
* @param string $data 消息内容
* @param array $properties 消息属性
* @param AMQPMessage $msg AMQP 消息对象
* @param Client $client AMQP 客户端实例
*/
public function consume($data, $properties, $msg, $client)
{
// 处理消息
echo "Received: {$data}\n";
// 手动确认消息
$client->ack($msg);
// 或者拒绝消息并重新入队
// $client->nack($msg, false, true);
}
}
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class ExchangeConsumer implements Consumer
{
public $connection = 'default';
// 交换机名称
public $exchange = 'my_exchange';
// 交换机类型:direct, fanout, topic
public $exchange_type = 'direct';
public $exchange_durable = true;
// 队列名称
public $queue = 'my_queue';
public $queue_durable = true;
// 路由键
public $routing_key = 'my_routing_key';
// 消息确认模式
public $no_ack = false;
// 预取数量
public $prefetch_count = 10;
public function consume($data, $properties, $msg, $client)
{
echo "Received: {$data}\n";
// 处理业务逻辑
try {
// 处理成功,确认消息
$client->ack($msg);
} catch (\Exception $e) {
// 处理失败,拒绝消息并重新入队
$client->nack($msg, false, true);
}
}
}
return [
// 消费者进程
ssh\Amqp\Exception\Process\Consumer::class => [
'consumer_dir' => base_path() . '/app/amqp', // 消费者类所在目录
],
];
interface Consumer
{
/**
* 返回队列名称
*
* @return string
*/
public function queue();
/**
* 消费消息
*
* @param string $data 消息内容
* @param array $properties 消息属性
* @param AMQPMessage $msg AMQP 消息对象
* @param Client $client AMQP 客户端
*/
public function consume($data, $properties, $msg, $client);
}
use ssh\Amqp\Exception\Client;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$client = Client::connection('default');
// 声明交换机
$client->declareExchange(
'my_exchange',
AMQPExchangeType::DIRECT,
false, // passive
true, // durable
false // auto_delete
);
// 声明队列
$client->declareQueue(
'my_queue',
false, // passive
true, // durable
false, // exclusive
false, // nowait
null // arguments
);
// 绑定队列到交换机
$client->bindQueue(
'my_queue',
'my_exchange',
'my_routing_key'
);
use ssh\Amqp\Exception\Client;
$client = Client::connection('default');
// 设置预取数量
$client->qos(0, 10);
// 开始消费
$client->consume('my_queue', '', false, false, false, false, function ($msg) {
echo "Received: {$msg->body}\n";
});
// 保持运行
while (true) {
$client->wait();
}
use ssh\Amqp\Exception\Client;
use ssh\Amqp\Exception\Exception\ConnectionException;
use ssh\Amqp\Exception\Exception\PublishException;
try {
Client::send('my_queue', 'Hello World!');
} catch (ConnectionException $e) {
// 处理连接异常
echo "连接失败: " . $e->getMessage() . "\n";
// 可以尝试重试或降级处理
} catch (PublishException $e) {
// 处理发布异常
echo "发布消息失败: " . $e->getMessage() . "\n";
}
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class MyConsumer implements Consumer
{
public $connection = 'default';
public $queue = 'my_queue';
public $no_ack = false;
public $prefetch_count = 1;
/**
* 判断处理失败的消息是否应该重新入队
*
* @param \Exception $e
* @return bool
*/
public function shouldRequeue(\Exception $e)
{
// 对于数据库连接失败等临时问题,重新入队
if (strpos($e->getMessage(), 'connection') !== false) {
return true;
}
// 对于其他异常(如格式错误),不重新入队
return false;
}
public function consume($data, $properties, $msg, $client)
{
// 处理消息
$client->ack($msg);
}
}
use ssh\Amqp\Exception\Client;
$client = Client::connection('default');
// 检查连接是否正常
if ($client->isConnected()) {
echo "连接正常\n";
}
// 手动重连
$client->reconnect();
// 关闭连接
$client->close();
use ssh\Amqp\Process\Consumer;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
$logger = new Logger('amqp');
$logger->pushHandler(new StreamHandler('path/to/log/file.log', Logger::DEBUG));
$consumer = new Consumer(base_path() . '/app/amqp');
$consumer->setLogger($logger);