PHP code example of abbots / yii2-kafka

1. Go to this page and download the library: Download abbots/yii2-kafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

abbots / yii2-kafka example snippets



namespace console\services;

use Yii;
use RdKafka\Message;
use yii\base\Exception;
use yii\kafka\ConsumerInterface;
use common\extensions\helpers\StringHelper;

/**
 * kafka公共逻辑基础类
 * Class Service
 *
 * @package console\base
 */
class KafkaService implements ConsumerInterface
{
    /**
     * 业务方法配置(适配多业务处理)
     * 必须在消费启动前配置
     *
     * @var string $businessMethod
     */
    public $businessMethod='business';

    /** @var array 消费信息(主题、分区...) */
    public $consumerInfo = [];

    /**
     * 队列消费方法统一封装
     *
     * @param Message $message
     * @return mixed
     */
    public function execute(Message $message)
    {
        //TODO message数据处理
        $data = $this->getData($message);

        //调用实际业务处理
        $this->{$this->businessMethod}($data);
    }

    /**
     * 数据处理
     *
     * @param Message $message
     */
    public function getData(Message $message)
    {
        // TODO message数据处理

        //return $data;
    }

    /**
     * 实际业务处理方法
     * @param Message $message
     */
    public function business(Message $message)
    {
        // TODO
    }
}



namespace console\base;

use Yii;
use yii\kafka\Consumer;
use yii\base\Exception;

/**
 * 控制台kafka消费控制器基础类
 * Class KafkaController
 *
 * @package common\console\base
 */
class KafkaController extends \yii\console\Controller
{
    /**
     * 消费者对象
     *
     * @var Consumer $kafkaConsumer
     */
    protected $kafkaConsumer = null;

    /**
     * @var array $consumer 消费组件配置
     *
     * 格式:[控制器方法名=>kafka消费组件名]
     */
    protected $consumer = [];

    public function beforeAction($action)
    {
        $result = parent::beforeAction($action);

        $this->setKafkaConsumer();

        return $result;
    }

    /**
     * @throws Exception
     * 消费对象创建
     */
    protected function setKafkaConsumer()
    {
        $action = $this->action->id;

        if (!isset($this->consumer[$action])) {
            return false;
        }

        $this->kafkaConsumer = Yii::$app->{$this->consumer[$action]};
        if (!($this->kafkaConsumer instanceof Consumer)) {
            throw new Exception('消费组件配置错误');
        }
        //设置客户端ID
        $params = Yii::$app->request->params;
        if(isset($params[1]) && is_numeric($params[1])) {
            $this->kafkaConsumer->client_id = $params[1];
        }
    }

    /**
     * 启动消费队列
     */
    protected function kafkaStart()
    {
        $this->kafkaConsumer->start();
    }
}


namespace console\modules\example\controllers;

use Yii;
use console\base\KafkaController;

class ExampleController extends KafkaController
{
    public $consumer = ['index' => 'example'];
    
    public function actionIndex()
    {
        $this->kafkaStart();
    }
}




namespace common\components\kafka;


use Yii;
use yii\base\Component;
use yii\kafka\ExceptionNoticeInterface;
use console\components\behaviors\exceptionNotice\EmailNoticeBehavior;

/**
 * kafka异常通知类
 * Class ExceptionNotice
 * @method mixed exceptionSend($message)
 * @package common\components\kafka
 */
class ExceptionNotice extends Component implements ExceptionNoticeInterface
{
    public function behaviors()
    {
        return [
            [
                'class'=>EmailNoticeBehavior::class,
                'title' => '9K平台kafka消费队列异常',
            ]
        ];
    }

    public function send($message)
    {
        $this->exceptionSend($message);
    }
}