1. Go to this page and download the library: Download sai97/laravel-amqp library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
//获取连接名称
public function getConnectName(): string;
//获取交换机名称
public function getExchangeName(): string;
//获取交换机类型
public function getExchangeType(): string;
//获取队列名称
public function getQueueName(): string;
//获取路由KEY
public function getRoutingKey(): string;
//获取ContentType
public function getContentType(): string;
//是否开启死信模式
public function isDeadLetter(): bool;
//获取死信交换机名称
public function getDeadLetterExchangeName(): string;
//获取死信路由KEY
public function getDeadLetterRoutingKey(): string;
//获取死信队列名称
public function getDeadLetterQueueName(): string;
//是否开启延迟任务
public function isDelay(): bool;
//获取延迟任务过期时长
public function getDelayTTL(): int;
//获取队列附加参数
public function getQueueArgs(): array;
//获取回调函数
public function getCallback(): callable;
//是否自动提交ACK
public function isAutoAck(): bool;
namespace App\QueueJob;
use PhpAmqpLib\Message\AMQPMessage;
use Sai97\LaravelAmqp\Queue;
class DefaultQueueJob extends Queue
{
public function getConnectName(): string
{
return "default";
}
public function getQueueName(): string
{
return "myQueue";
}
public function getCallback(): callable
{
return function (AMQPMessage $message){
$nowDate = date("Y-m-d H:i:s", time());
echo "[{$nowDate}] The consumer message: {$message->body}" . PHP_EOL;
$message->ack();
};
}
}
$message = "This is message...";
$amqpQueueServices = new AmqpQueueServices(new DefaultQueueJob);
$amqpQueueServices->producer($message);
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Sai97\LaravelAmqp\AmqpQueueServices;
class RabbitMQWorker extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:worker {event}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'rabbitmq worker 消费进程';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
try {
$event = $this->argument("event");
$eventConfig = config("amqp.event");
if (!isset($eventConfig[$event]) || empty($entity = $eventConfig[$event])) {
return $this->error("未知的事件: {$event}");
}
$this->info("rabbitmq worker of event[{$event}] process start ...");
$amqpQueueServices = new AmqpQueueServices(new $entity);
$amqpQueueServices->consumer();
} catch (\Throwable $throwable) {
$event = $event ?? "";
$this->error($throwable->getFile() . " [{$throwable->getLine()}]");
return $this->error("rabbitmq worker of event[{$event}] process error:{$throwable->getMessage()}");
}
$this->info("rabbitmq worker of event[{$event}] process stop ...");
}
}