PHP code example of jenner / kafka-php

1. Go to this page and download the library: Download jenner/kafka-php library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

jenner / kafka-php example snippets

 php
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);

$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test1111111'));
$produce->setMessages('test6', 0, array('test1111111'));
$produce->setMessages('test6', 2, array('test1111111'));
$produce->setMessages('test6', 1, array('test111111111133'));
$result = $produce->send();
var_dump($result);

 php
$consumer = \Kafka\Consumer::getInstance('localhost:2181');

$consumer->setGroup('testgroup');
$consumer->setPartition('test', 0);
$consumer->setPartition('test6', 2, 10);
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
    foreach ($topic as $partId => $partition) {
        var_dump($partition->getHighOffset());
        foreach ($partition as $message) {
            var_dump((string)$message);
        }
    }
}
 php

$data = array(
    '   'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'messages' => array(
                        'message1',
                        'message2',
                    ),
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->produceRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->produceResponse();
var_dump($result);

 php

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 0, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchResponse();
var_dump($result);

 php

$data = array(
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'max_offset' => 10, 
                    'time' => -1, 
                ),
            ),
        ),
    ),
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->offsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->offsetResponse();
var_dump($result);

 php
array(
   'topic_name1', // topic name
);
 php

$data = array(
    'test'
);

$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->metadataRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->metadataResponse();
var_dump($result);

 php
$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                    'offset' => 2, 
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->commitOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->commitOffsetResponse();
var_dump($result);

 php
$data = array(
    'group_id' => 'testgroup',
    'data' => array(
        array(
            'topic_name' => 'test',
            'partitions' => array(
                array(
                    'partition_id' => 0,
                ),
            ),
        ),
    ),
);


$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchOffsetRequest($data);

$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchOffsetResponse();
var_dump($result);