PHP code example of nanasen / php-rdkafka-class

1. Go to this page and download the library: Download nanasen/php-rdkafka-class library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

nanasen / php-rdkafka-class example snippets


# setTopic('qkl01', 0, $offset)  设置-1,从最后一次服务器记录一次消费开始消费
$offset = -1; //开始消费点
$consumer = new \Msg\Kafka\Consumer(['ip'=>'192.168.216.122']);
$consumer->setConsumerGroup('test-110-sxx1')
     ->setBrokerServer('192.168.216.122')
     ->setConsumerTopic()
     ->setTopic('qkl01', 0, $offset)
     ->subscribe(['qkl01'])
     ->consumer(function($msg){
         var_dump($msg);
     });

$config = [
    'ip'=>'127.0.0.1',
    'dr_msg_cb' => function($kafka, $message) {
        // var_dump((array)$message);
        //todo
        //do biz something, don't exit() or die()
    }
];
$producer = new \Msg\Kafka\Producer($config);
$topic = $producer->setBrokerServer()->setProducerTopic('test');
// $rst = $producer->setBrokerServer()
//                  ->setProducerTopic('test')
//                  ->producer('test', 90);
$start = microtime(true);
for($i = 0; $i < 10; ++$i){
    $topic->producer('test'.$i, 90);
}

while($producer->getOutQLength() > 0){
    $producer->poll(50);
}

function defaultDrMsg($kafka, $message) {
    file_put_contents($this->config['log_path'] . "/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
}

function defaultErrorCb($kafka, $err, $reason) {
    file_put_contents($this->config['log_path'] . "/err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
}

function defaultRebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null)
{
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            if (is_null($this->getCurrentTopic())) {
                $kafka->assign();
            } else {
                $kafka->assign([
                    new \RdKafka\TopicPartition( $this->getCurrentTopic(), $this->getPartition($this->getCurrentTopic()), $this->getOffset($this->getCurrentTopic()) )
                ]);
            }
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
}