PHP code example of zhan3333 / laravel-rabbitmq
1. Go to this page and download the library: Download zhan3333/laravel-rabbitmq library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
zhan3333 / laravel-rabbitmq example snippets
return [
'active' => env('RABBITMQ_ACTIVE', true),
// 任务队列配置
'task' => [
// 是否开启日志 true/false
'log_enable' => env('TASK_RABBITMQ_LOG_ENABLE', false),
// 日志使用通道
'log_channel' => env('TASK_RABBITMQ_LOG_CHANNEL', 'daily'),
// 队列配置
'host' => env('TASK_RABBITMQ_HOST', '127.0.0.1'),
'port' => env('TASK_RABBITMQ_PORT', 5672),
'user' => env('TASK_RABBITMQ_USER', 'user'),
'pwd' => env('TASK_RABBITMQ_PWD', 'pwd'),
'vhost' => env('TASK_RABBITMQ_VHOST', 'vhost'),
'exchange_name' => env('TASK_RABBITMQ_EXCHANGE_NAME', 'exchange'),
'queue_name' => env('TASK_RABBITMQ_QUEUE_NAME', 'task'),
'routing_key' => env('TASK_RABBITMQ_ROUTING_KEY', 'task'),
// consumer only
'consumer_tag' => env('TASK_RABBITMQ_CONSUMER_TAG', 'consumer_tag'),
'keepalive' => env('TASK_RABBITMQ_KEEPALIVE', false),
'heartbeat' => env('TASK_RABBITMQ_HEARTBEAT', 0),
'connection_timeout' => env('TASK_RABBITMQ_CONNECTION_TIMEOUT', 0),
'read_write_timeout' => env('TASK_RABBITMQ_READ_WRITE_TIMEOUT', 0),
'channel_rpc_timeout' => 0.0,
'ssl_protocol' => null,
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'context' => null,
],
];
use Zhan3333\RabbitMQ\Consumers\TaskConsumer;
function () {
app(TaskConsumer::class)
->after(function ($id, $task, $data) {
Log::debug('after', [$id, $task, $data]);
})
->before(function ($id, $task, $data) {
Log::debug('befor', [$id, $task, $data]);
})
->start();
}
use App\Exceptions\Handler;
use Zhan3333\RabbitMQ\Task\Dispatchable;
use Zhan3333\RabbitMQ\Task\Task;
class PrintDataTask extends Task
{
use Dispatchable;
public $data;
/**
* 使用时通过 construct 传入参数,并保存到类的成员变量中(public)
* PrintDataTask constructor.
* @param $data
*/
public function __construct($data)
{
$this->data = $data;
}
/**
* - 将在队列消费者中执行,支持依赖注入
* - 返回的结果将作为 finish 的第二个result参数
* - 在handle中产生或者抛出的异常将会传到 failed 中
* @return array
*/
public function handle()
{
\Log::debug(__CLASS__ . '\\' . __FUNCTION__, [$this->taskId, $this->data]);
return [
'handle result' => 'success',
'data' => $this->data,
];
}
/**
* handle处理完成后的处理
* @param $taskId
* @param $result
*/
public function finish($taskId, $result): void
{
\Log::debug(__CLASS__ . '\\' . __FUNCTION__, [$taskId, $result]);
}
/**
* 处理handle中的异常
* @param $taskId
* @param Throwable $exception
*/
public function failed($taskId, \Throwable $exception): void
{
if ($exception instanceof \Exception) {
// 可选, 打印错误到 laravel.log
app(Handler::class)->report($exception);
}
\Log::debug(__CLASS__ . '\\' . __FUNCTION__, [$taskId, $exception->getMessage()]);
}
}
// 返回唯一任务id
// 发送到队列中执行
$taskId = TaskDispatchTest::dispatch(['dispatch params' => ['foo' => 'bar', true => false,]])->taskId;
// 同步执行,但是操作上和异步完全一致,也返回taskId
$taskId = TaskDispatchTest::dispatchNow(['dispatch params' => ['foo' => 'bar', true => false,]])->taskId;
// 使用自定义的taskId
$taskId = '123456-789123';
TaskDispatchTest::dispatch(['dispatch params' => ['foo' => 'bar', true => false,]])->setTaskId($taskId);
php artisan rabbitmq:task-consumer
bash
php artisan rabbitmq:publisher --message=test --config=task