PHP code example of ssh / amqp

1. Go to this page and download the library: Download ssh/amqp library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

ssh / amqp example snippets



return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'options' => [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 3.0,
            'heartbeat' => 0,
            'max_reconnect_attempts' => 3, // 最大重连次数
            'reconnect_delay' => 1, // 重连间隔(秒)
        ],
    ],
    'cluster' => [
        'host' => '192.168.1.100',
        'port' => 5672,
        'user' => 'admin',
        'password' => 'admin123',
        'vhost' => '/',
        'options' => [
            'max_reconnect_attempts' => 5,
            'reconnect_delay' => 2,
        ],
    ],
];

use ssh\Amqp\Client;

// 发送字符串消息(默认交换机,routing_key = 队列名)
Client::send('my_queue', 'Hello World!');

// 使用指定连接
Client::send('my_queue', 'Hello World!', 'consumer');

// 使用指定连接和配置
Client::send('my_queue', 'Hello World!', 'consumer', 'plugin.rabbitmq.rabbitmq');

// 发送到交换机
Client::send(
    'my_queue',                      // 队列名
    'Hello World!',                  // 消息内容
    'consumer',                      // 连接名
    'plugin.rabbitmq.rabbitmq',      // 配置名(可选)
    [],                              // 消息属性
    'my_exchange',                   // 交换机名
    'my_routing_key'                 // 路由键(可选,默认为队列名)
);

// 发送 JSON 数据
Client::send('my_queue', json_encode(['id' => 1, 'name' => 'test']));

// 带成功/失败回调
Client::send(
    'my_queue',
    'Hello World!',
    'consumer',
    'plugin.rabbitmq.rabbitmq',
    [],
    '',
    'my_queue',
    function($msg, $queue, $exchange, $routing_key) {
        echo "发送成功!\n";
        echo "消息: " . $msg->body . "\n";
    },
    function($e, $msg, $queue, $exchange, $routing_key) {
        echo "发送失败:" . $e->getMessage() . "\n";
    }
);

// 使用返回值判断
try {
    $result = Client::send('my_queue', 'Hello World!');
    if ($result) {
        echo "发送成功!\n";
    }
} catch (\Exception $e) {
    echo "发送失败:" . $e->getMessage() . "\n";
}

use ssh\Amqp\Client;

$client = Client::connection('consumer');

// 简单发送
$result = $client->publish('my_queue', '', 'my_routing_key', 'Hello AMQP!');
if ($result) {
    echo "发送成功!\n";
}

// 发送消息到指定交换机
$client->publish('my_queue', 'my_exchange', 'my_routing_key', 'Hello AMQP!');

// 带回调的发送
$client->publish(
    'my_queue',
    '',
    'my_routing_key',
    'Hello AMQP!',
    false,
    false,
    0,
    function($msg, $queue, $exchange, $routing_key) {
        echo "发送成功!\n";
    },
    function($e, $msg, $queue, $exchange, $routing_key) {
        echo "发送失败:" . $e->getMessage() . "\n";
    }
);

use ssh\Amqp\Client;
use PhpAmqpLib\Message\AMQPMessage;

$msg = new AMQPMessage('Hello World!', [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'content_type' => 'application/json',
    'priority' => 5,
]);

$client = Client::connection('default');
$client->publish('my_queue', 'my_exchange', 'my_routing_key', $msg);


namespace app\amqp;

use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;

class MyConsumer implements Consumer
{
    // 连接名称,默认 'default'
    public $connection = 'default';

    // 队列名称
    public $queue = 'my_queue';

    // 是否自动确认,false 表示需要手动 ack
    public $no_ack = false;

    // 预取消息数量
    public $prefetch_count = 1;

    /**
     * 消费消息
     *
     * @param string $data 消息内容
     * @param array $properties 消息属性
     * @param AMQPMessage $msg AMQP 消息对象
     * @param Client $client AMQP 客户端实例
     */
    public function consume($data, $properties, $msg, $client)
    {
        // 处理消息
        echo "Received: {$data}\n";

        // 手动确认消息
        $client->ack($msg);

        // 或者拒绝消息并重新入队
        // $client->nack($msg, false, true);
    }
}


namespace app\amqp;

use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;

class ExchangeConsumer implements Consumer
{
    public $connection = 'default';

    // 交换机名称
    public $exchange = 'my_exchange';

    // 交换机类型:direct, fanout, topic
    public $exchange_type = 'direct';
    public $exchange_durable = true;

    // 队列名称
    public $queue = 'my_queue';
    public $queue_durable = true;

    // 路由键
    public $routing_key = 'my_routing_key';

    // 消息确认模式
    public $no_ack = false;

    // 预取数量
    public $prefetch_count = 10;

    public function consume($data, $properties, $msg, $client)
    {
        echo "Received: {$data}\n";

        // 处理业务逻辑
        try {
            // 处理成功,确认消息
            $client->ack($msg);
        } catch (\Exception $e) {
            // 处理失败,拒绝消息并重新入队
            $client->nack($msg, false, true);
        }
    }
}


return [
    // 消费者进程
    ssh\Amqp\Exception\Process\Consumer::class => [
        'consumer_dir' => base_path() . '/app/amqp', // 消费者类所在目录
    ],
];

interface Consumer
{
    /**
     * 返回队列名称
     *
     * @return string
     */
    public function queue();

    /**
     * 消费消息
     *
     * @param string $data 消息内容
     * @param array $properties 消息属性
     * @param AMQPMessage $msg AMQP 消息对象
     * @param Client $client AMQP 客户端
     */
    public function consume($data, $properties, $msg, $client);
}

use ssh\Amqp\Exception\Client;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$client = Client::connection('default');

// 声明交换机
$client->declareExchange(
    'my_exchange',
    AMQPExchangeType::DIRECT,
    false,  // passive
    true,   // durable
    false   // auto_delete
);

// 声明队列
$client->declareQueue(
    'my_queue',
    false,  // passive
    true,   // durable
    false,  // exclusive
    false,  // nowait
    null    // arguments
);

// 绑定队列到交换机
$client->bindQueue(
    'my_queue',
    'my_exchange',
    'my_routing_key'
);

use ssh\Amqp\Exception\Client;

$client = Client::connection('default');

// 设置预取数量
$client->qos(0, 10);

// 开始消费
$client->consume('my_queue', '', false, false, false, false, function ($msg) {
    echo "Received: {$msg->body}\n";
});

// 保持运行
while (true) {
    $client->wait();
}

use ssh\Amqp\Exception\Client;
use ssh\Amqp\Exception\Exception\ConnectionException;
use ssh\Amqp\Exception\Exception\PublishException;

try {
    Client::send('my_queue', 'Hello World!');
} catch (ConnectionException $e) {
    // 处理连接异常
    echo "连接失败: " . $e->getMessage() . "\n";
    // 可以尝试重试或降级处理
} catch (PublishException $e) {
    // 处理发布异常
    echo "发布消息失败: " . $e->getMessage() . "\n";
}


namespace app\amqp;

use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;

class MyConsumer implements Consumer
{
    public $connection = 'default';
    public $queue = 'my_queue';
    public $no_ack = false;
    public $prefetch_count = 1;

    /**
     * 判断处理失败的消息是否应该重新入队
     *
     * @param \Exception $e
     * @return bool
     */
    public function shouldRequeue(\Exception $e)
    {
        // 对于数据库连接失败等临时问题,重新入队
        if (strpos($e->getMessage(), 'connection') !== false) {
            return true;
        }

        // 对于其他异常(如格式错误),不重新入队
        return false;
    }

    public function consume($data, $properties, $msg, $client)
    {
        // 处理消息
        $client->ack($msg);
    }
}

use ssh\Amqp\Exception\Client;

$client = Client::connection('default');

// 检查连接是否正常
if ($client->isConnected()) {
    echo "连接正常\n";
}

// 手动重连
$client->reconnect();

// 关闭连接
$client->close();

use ssh\Amqp\Process\Consumer;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

$logger = new Logger('amqp');
$logger->pushHandler(new StreamHandler('path/to/log/file.log', Logger::DEBUG));

$consumer = new Consumer(base_path() . '/app/amqp');
$consumer->setLogger($logger);