1. Go to this page and download the library: Download abbots/yii2-kafka library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
abbots / yii2-kafka example snippets
namespace console\services;
use Yii;
use RdKafka\Message;
use yii\base\Exception;
use yii\kafka\ConsumerInterface;
use common\extensions\helpers\StringHelper;
/**
* kafka公共逻辑基础类
* Class Service
*
* @package console\base
*/
class KafkaService implements ConsumerInterface
{
/**
* 业务方法配置(适配多业务处理)
* 必须在消费启动前配置
*
* @var string $businessMethod
*/
public $businessMethod='business';
/** @var array 消费信息(主题、分区...) */
public $consumerInfo = [];
/**
* 队列消费方法统一封装
*
* @param Message $message
* @return mixed
*/
public function execute(Message $message)
{
//TODO message数据处理
$data = $this->getData($message);
//调用实际业务处理
$this->{$this->businessMethod}($data);
}
/**
* 数据处理
*
* @param Message $message
*/
public function getData(Message $message)
{
// TODO message数据处理
//return $data;
}
/**
* 实际业务处理方法
* @param Message $message
*/
public function business(Message $message)
{
// TODO
}
}
namespace console\base;
use Yii;
use yii\kafka\Consumer;
use yii\base\Exception;
/**
* 控制台kafka消费控制器基础类
* Class KafkaController
*
* @package common\console\base
*/
class KafkaController extends \yii\console\Controller
{
/**
* 消费者对象
*
* @var Consumer $kafkaConsumer
*/
protected $kafkaConsumer = null;
/**
* @var array $consumer 消费组件配置
*
* 格式:[控制器方法名=>kafka消费组件名]
*/
protected $consumer = [];
public function beforeAction($action)
{
$result = parent::beforeAction($action);
$this->setKafkaConsumer();
return $result;
}
/**
* @throws Exception
* 消费对象创建
*/
protected function setKafkaConsumer()
{
$action = $this->action->id;
if (!isset($this->consumer[$action])) {
return false;
}
$this->kafkaConsumer = Yii::$app->{$this->consumer[$action]};
if (!($this->kafkaConsumer instanceof Consumer)) {
throw new Exception('消费组件配置错误');
}
//设置客户端ID
$params = Yii::$app->request->params;
if(isset($params[1]) && is_numeric($params[1])) {
$this->kafkaConsumer->client_id = $params[1];
}
}
/**
* 启动消费队列
*/
protected function kafkaStart()
{
$this->kafkaConsumer->start();
}
}
namespace console\modules\example\controllers;
use Yii;
use console\base\KafkaController;
class ExampleController extends KafkaController
{
public $consumer = ['index' => 'example'];
public function actionIndex()
{
$this->kafkaStart();
}
}
namespace common\components\kafka;
use Yii;
use yii\base\Component;
use yii\kafka\ExceptionNoticeInterface;
use console\components\behaviors\exceptionNotice\EmailNoticeBehavior;
/**
* kafka异常通知类
* Class ExceptionNotice
* @method mixed exceptionSend($message)
* @package common\components\kafka
*/
class ExceptionNotice extends Component implements ExceptionNoticeInterface
{
public function behaviors()
{
return [
[
'class'=>EmailNoticeBehavior::class,
'title' => '9K平台kafka消费队列异常',
]
];
}
public function send($message)
{
$this->exceptionSend($message);
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.